You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2018/07/11 04:04:21 UTC
zookeeper git commit: ZOOKEEPER-3084: Exit when ZooKeeper cannot bind
to the leader election port
Repository: zookeeper
Updated Branches:
refs/heads/master 5fdd70ac4 -> c2e7ed1e6
ZOOKEEPER-3084: Exit when ZooKeeper cannot bind to the leader election port
Author: Fangmin Lyu <al...@fb.com>
Reviewers: Andor Molnár <an...@apache.org>, Benjamin Reed <br...@apache.org>, Norbert Kalmar <nk...@cloudera.com>, Michael Han <ha...@apache.org>
Closes #562 from lvfangmin/ZOOKEEPER-3084
Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/c2e7ed1e
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/c2e7ed1e
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/c2e7ed1e
Branch: refs/heads/master
Commit: c2e7ed1e6f8f2de48778db7f3d63f9629c086ea8
Parents: 5fdd70a
Author: Fangmin Lyu <al...@fb.com>
Authored: Tue Jul 10 21:04:17 2018 -0700
Committer: Michael Han <ha...@apache.org>
Committed: Tue Jul 10 21:04:17 2018 -0700
----------------------------------------------------------------------
.../server/quorum/QuorumCnxManager.java | 107 ++++++++++---------
1 file changed, 58 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c2e7ed1e/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
index 09da63a..705b846 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
@@ -23,6 +23,7 @@ import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -58,11 +59,11 @@ import org.slf4j.LoggerFactory;
* maintains one connection for every pair of servers. The tricky part is to
* guarantee that there is exactly one connection for every pair of servers that
* are operating correctly and that can communicate over the network.
- *
+ *
* If two servers try to start a connection concurrently, then the connection
* manager uses a very simple tie-breaking mechanism to decide which connection
- * to drop based on the IP addressed of the two parties.
- *
+ * to drop based on the IP addressed of the two parties.
+ *
* For every peer, the manager maintains a queue of messages to send. If the
* connection to any particular peer drops, then the sender thread puts the
* message back on the list. As this implementation currently uses a queue
@@ -70,7 +71,7 @@ import org.slf4j.LoggerFactory;
* message to the tail of the queue, thus changing the order of messages.
* Although this is not a problem for the leader election, it could be a problem
* when consolidating peer communication. This is to be verified, though.
- *
+ *
*/
public class QuorumCnxManager {
@@ -85,7 +86,7 @@ public class QuorumCnxManager {
static final int SEND_CAPACITY = 1;
static final int PACKETMAXSIZE = 1024 * 512;
-
+
/*
* Negative counter for observer server ids.
*/
@@ -103,9 +104,9 @@ public class QuorumCnxManager {
static public final int maxBuffer = 2048;
/*
- * Connection time out value in milliseconds
+ * Connection time out value in milliseconds
*/
-
+
private int cnxTO = 5000;
final QuorumPeer self;
@@ -255,12 +256,12 @@ public class QuorumCnxManager {
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
-
+
String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
if(cnxToValue != null){
this.cnxTO = Integer.parseInt(cnxToValue);
}
-
+
this.self = self;
this.mySid = mySid;
@@ -313,7 +314,7 @@ public class QuorumCnxManager {
/**
* Invokes initiateConnection for testing purposes
- *
+ *
* @param sid
*/
public void testInitiateConnection(long sid) throws Exception {
@@ -436,24 +437,24 @@ public class QuorumCnxManager {
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
-
+
if(vsw != null)
vsw.finish();
-
+
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
SEND_CAPACITY));
-
+
sw.start();
rw.start();
-
- return true;
-
+
+ return true;
+
}
return false;
}
-
-
+
+
/**
* If this server receives a connection request, then it gives up on the new
* connection if it wins. Notice that it checks whether it has a connection
@@ -575,7 +576,7 @@ public class QuorumCnxManager {
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
-
+
if (vsw != null) {
vsw.finish();
}
@@ -584,14 +585,14 @@ public class QuorumCnxManager {
queueSendMap.putIfAbsent(sid,
new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
-
+
sw.start();
rw.start();
}
}
/**
- * Processes invoke this message to queue a message to send. Currently,
+ * Processes invoke this message to queue a message to send. Currently,
* only leader election uses it.
*/
public void toSend(Long sid, ByteBuffer b) {
@@ -617,13 +618,13 @@ public class QuorumCnxManager {
addToSendQueue(bq, b);
}
connectOne(sid);
-
+
}
}
-
+
/**
* Try to establish a connection to server with id sid using its electionAddr.
- *
+ *
* @param sid server id
* @return boolean success indication
*/
@@ -666,12 +667,12 @@ public class QuorumCnxManager {
closeSocket(sock);
return false;
}
-
+
}
-
+
/**
* Try to establish a connection to server with id sid.
- *
+ *
* @param sid server id
*/
synchronized void connectOne(long sid){
@@ -705,22 +706,22 @@ public class QuorumCnxManager {
}
}
}
-
-
+
+
/**
* Try to establish a connection with each server if one
* doesn't exist.
*/
-
+
public void connectAll(){
long sid;
for(Enumeration<Long> en = queueSendMap.keys();
en.hasMoreElements();){
sid = en.nextElement();
connectOne(sid);
- }
+ }
}
-
+
/**
* Check if all queues are empty, indicating that all messages have been delivered.
@@ -743,7 +744,7 @@ public class QuorumCnxManager {
shutdown = true;
LOG.debug("Halting listener");
listener.halt();
-
+
// Wait for the listener to terminate.
try {
listener.join();
@@ -759,7 +760,7 @@ public class QuorumCnxManager {
inprogressConnections.clear();
resetConnectionThreadCount();
}
-
+
/**
* A soft halt simply finishes workers.
*/
@@ -772,7 +773,7 @@ public class QuorumCnxManager {
/**
* Helper method to set socket options.
- *
+ *
* @param sock
* Reference to socket
*/
@@ -784,7 +785,7 @@ public class QuorumCnxManager {
/**
* Helper method to close a socket.
- *
+ *
* @param sock
* Reference to socket
*/
@@ -842,6 +843,7 @@ public class QuorumCnxManager {
int numRetries = 0;
InetSocketAddress addr;
Socket client = null;
+ IOException exitException = null;
while((!shutdown) && (numRetries < 3)){
try {
ss = new ServerSocket();
@@ -886,6 +888,7 @@ public class QuorumCnxManager {
break;
}
LOG.error("Exception while listening", e);
+ exitException = e;
numRetries++;
try {
ss.close();
@@ -905,6 +908,12 @@ public class QuorumCnxManager {
+ "I won't be able to participate in leader "
+ "election any longer: "
+ self.getElectionAddress());
+ if (exitException instanceof BindException) {
+ // After leaving listener thread, the host cannot join the
+ // quorum anymore, this is a severe error that we cannot
+ // recover from, so we need to exit
+ System.exit(14);
+ }
} else if (ss != null) {
// Clean up for shutdown.
try {
@@ -948,7 +957,7 @@ public class QuorumCnxManager {
/**
* An instance of this thread receives messages to send
* through a queue and sends them to the server sid.
- *
+ *
* @param sock
* Socket to remote peer
* @param sid
@@ -975,23 +984,23 @@ public class QuorumCnxManager {
/**
* Returns RecvWorker that pairs up with this SendWorker.
- *
- * @return RecvWorker
+ *
+ * @return RecvWorker
*/
synchronized RecvWorker getRecvWorker(){
return recvWorker;
}
-
+
synchronized boolean finish() {
LOG.debug("Calling finish for " + sid);
-
+
if(!running){
/*
- * Avoids running finish() twice.
+ * Avoids running finish() twice.
*/
return running;
}
-
+
running = false;
closeSocket(sock);
@@ -1006,7 +1015,7 @@ public class QuorumCnxManager {
threadCnt.decrementAndGet();
return running;
}
-
+
synchronized void send(ByteBuffer b) throws IOException {
byte[] msgBytes = new byte[b.capacity()];
try {
@@ -1050,7 +1059,7 @@ public class QuorumCnxManager {
LOG.error("Failed to send last message. Shutting down thread.", e);
this.finish();
}
-
+
try {
while (running && !shutdown && sock != null) {
@@ -1111,20 +1120,20 @@ public class QuorumCnxManager {
running = false;
}
}
-
+
/**
* Shuts down this worker
- *
+ *
* @return boolean Value of variable running
*/
synchronized boolean finish() {
if(!running){
/*
- * Avoids running finish() twice.
+ * Avoids running finish() twice.
*/
return running;
}
- running = false;
+ running = false;
this.interrupt();
threadCnt.decrementAndGet();