You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2018/07/11 04:04:21 UTC

zookeeper git commit: ZOOKEEPER-3084: Exit when ZooKeeper cannot bind to the leader election port

Repository: zookeeper
Updated Branches:
  refs/heads/master 5fdd70ac4 -> c2e7ed1e6


ZOOKEEPER-3084: Exit when ZooKeeper cannot bind to the leader election port

Author: Fangmin Lyu <al...@fb.com>

Reviewers: Andor Molnár <an...@apache.org>, Benjamin Reed <br...@apache.org>, Norbert Kalmar <nk...@cloudera.com>, Michael Han <ha...@apache.org>

Closes #562 from lvfangmin/ZOOKEEPER-3084


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/c2e7ed1e
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/c2e7ed1e
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/c2e7ed1e

Branch: refs/heads/master
Commit: c2e7ed1e6f8f2de48778db7f3d63f9629c086ea8
Parents: 5fdd70a
Author: Fangmin Lyu <al...@fb.com>
Authored: Tue Jul 10 21:04:17 2018 -0700
Committer: Michael Han <ha...@apache.org>
Committed: Tue Jul 10 21:04:17 2018 -0700

----------------------------------------------------------------------
 .../server/quorum/QuorumCnxManager.java         | 107 ++++++++++---------
 1 file changed, 58 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/c2e7ed1e/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
index 09da63a..705b846 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
@@ -23,6 +23,7 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
@@ -58,11 +59,11 @@ import org.slf4j.LoggerFactory;
  * maintains one connection for every pair of servers. The tricky part is to
  * guarantee that there is exactly one connection for every pair of servers that
  * are operating correctly and that can communicate over the network.
- * 
+ *
  * If two servers try to start a connection concurrently, then the connection
  * manager uses a very simple tie-breaking mechanism to decide which connection
- * to drop based on the IP addressed of the two parties. 
- * 
+ * to drop based on the IP addressed of the two parties.
+ *
  * For every peer, the manager maintains a queue of messages to send. If the
  * connection to any particular peer drops, then the sender thread puts the
  * message back on the list. As this implementation currently uses a queue
@@ -70,7 +71,7 @@ import org.slf4j.LoggerFactory;
  * message to the tail of the queue, thus changing the order of messages.
  * Although this is not a problem for the leader election, it could be a problem
  * when consolidating peer communication. This is to be verified, though.
- * 
+ *
  */
 
 public class QuorumCnxManager {
@@ -85,7 +86,7 @@ public class QuorumCnxManager {
     static final int SEND_CAPACITY = 1;
 
     static final int PACKETMAXSIZE = 1024 * 512;
-    
+
     /*
      * Negative counter for observer server ids.
      */
@@ -103,9 +104,9 @@ public class QuorumCnxManager {
     static public final int maxBuffer = 2048;
 
     /*
-     * Connection time out value in milliseconds 
+     * Connection time out value in milliseconds
      */
-    
+
     private int cnxTO = 5000;
 
     final QuorumPeer self;
@@ -255,12 +256,12 @@ public class QuorumCnxManager {
         this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
         this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
         this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
-        
+
         String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
         if(cnxToValue != null){
             this.cnxTO = Integer.parseInt(cnxToValue);
         }
-        
+
         this.self = self;
 
         this.mySid = mySid;
@@ -313,7 +314,7 @@ public class QuorumCnxManager {
 
     /**
      * Invokes initiateConnection for testing purposes
-     * 
+     *
      * @param sid
      */
     public void testInitiateConnection(long sid) throws Exception {
@@ -436,24 +437,24 @@ public class QuorumCnxManager {
             sw.setRecv(rw);
 
             SendWorker vsw = senderWorkerMap.get(sid);
-            
+
             if(vsw != null)
                 vsw.finish();
-            
+
             senderWorkerMap.put(sid, sw);
             queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
                         SEND_CAPACITY));
-            
+
             sw.start();
             rw.start();
-            
-            return true;    
-            
+
+            return true;
+
         }
         return false;
     }
-    
-    
+
+
     /**
      * If this server receives a connection request, then it gives up on the new
      * connection if it wins. Notice that it checks whether it has a connection
@@ -575,7 +576,7 @@ public class QuorumCnxManager {
             sw.setRecv(rw);
 
             SendWorker vsw = senderWorkerMap.get(sid);
-            
+
             if (vsw != null) {
                 vsw.finish();
             }
@@ -584,14 +585,14 @@ public class QuorumCnxManager {
 
             queueSendMap.putIfAbsent(sid,
                     new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
-            
+
             sw.start();
             rw.start();
         }
     }
 
     /**
-     * Processes invoke this message to queue a message to send. Currently, 
+     * Processes invoke this message to queue a message to send. Currently,
      * only leader election uses it.
      */
     public void toSend(Long sid, ByteBuffer b) {
@@ -617,13 +618,13 @@ public class QuorumCnxManager {
                  addToSendQueue(bq, b);
              }
              connectOne(sid);
-                
+
         }
     }
-    
+
     /**
      * Try to establish a connection to server with id sid using its electionAddr.
-     * 
+     *
      *  @param sid  server id
      *  @return boolean success indication
      */
@@ -666,12 +667,12 @@ public class QuorumCnxManager {
              closeSocket(sock);
              return false;
          }
-   
+
     }
-    
+
     /**
      * Try to establish a connection to server with id sid.
-     * 
+     *
      *  @param sid  server id
      */
     synchronized void connectOne(long sid){
@@ -705,22 +706,22 @@ public class QuorumCnxManager {
             }
         }
     }
-    
-    
+
+
     /**
      * Try to establish a connection with each server if one
      * doesn't exist.
      */
-    
+
     public void connectAll(){
         long sid;
         for(Enumeration<Long> en = queueSendMap.keys();
             en.hasMoreElements();){
             sid = en.nextElement();
             connectOne(sid);
-        }      
+        }
     }
-    
+
 
     /**
      * Check if all queues are empty, indicating that all messages have been delivered.
@@ -743,7 +744,7 @@ public class QuorumCnxManager {
         shutdown = true;
         LOG.debug("Halting listener");
         listener.halt();
-        
+
         // Wait for the listener to terminate.
         try {
             listener.join();
@@ -759,7 +760,7 @@ public class QuorumCnxManager {
         inprogressConnections.clear();
         resetConnectionThreadCount();
     }
-   
+
     /**
      * A soft halt simply finishes workers.
      */
@@ -772,7 +773,7 @@ public class QuorumCnxManager {
 
     /**
      * Helper method to set socket options.
-     * 
+     *
      * @param sock
      *            Reference to socket
      */
@@ -784,7 +785,7 @@ public class QuorumCnxManager {
 
     /**
      * Helper method to close a socket.
-     * 
+     *
      * @param sock
      *            Reference to socket
      */
@@ -842,6 +843,7 @@ public class QuorumCnxManager {
             int numRetries = 0;
             InetSocketAddress addr;
             Socket client = null;
+            IOException exitException = null;
             while((!shutdown) && (numRetries < 3)){
                 try {
                     ss = new ServerSocket();
@@ -886,6 +888,7 @@ public class QuorumCnxManager {
                         break;
                     }
                     LOG.error("Exception while listening", e);
+                    exitException = e;
                     numRetries++;
                     try {
                         ss.close();
@@ -905,6 +908,12 @@ public class QuorumCnxManager {
                         + "I won't be able to participate in leader "
                         + "election any longer: "
                         + self.getElectionAddress());
+                if (exitException instanceof BindException) {
+                    // After leaving listener thread, the host cannot join the
+                    // quorum anymore, this is a severe error that we cannot
+                    // recover from, so we need to exit
+                    System.exit(14);
+                }
             } else if (ss != null) {
                 // Clean up for shutdown.
                 try {
@@ -948,7 +957,7 @@ public class QuorumCnxManager {
         /**
          * An instance of this thread receives messages to send
          * through a queue and sends them to the server sid.
-         * 
+         *
          * @param sock
          *            Socket to remote peer
          * @param sid
@@ -975,23 +984,23 @@ public class QuorumCnxManager {
 
         /**
          * Returns RecvWorker that pairs up with this SendWorker.
-         * 
-         * @return RecvWorker 
+         *
+         * @return RecvWorker
          */
         synchronized RecvWorker getRecvWorker(){
             return recvWorker;
         }
-                
+
         synchronized boolean finish() {
             LOG.debug("Calling finish for " + sid);
-            
+
             if(!running){
                 /*
-                 * Avoids running finish() twice. 
+                 * Avoids running finish() twice.
                  */
                 return running;
             }
-            
+
             running = false;
             closeSocket(sock);
 
@@ -1006,7 +1015,7 @@ public class QuorumCnxManager {
             threadCnt.decrementAndGet();
             return running;
         }
-        
+
         synchronized void send(ByteBuffer b) throws IOException {
             byte[] msgBytes = new byte[b.capacity()];
             try {
@@ -1050,7 +1059,7 @@ public class QuorumCnxManager {
                 LOG.error("Failed to send last message. Shutting down thread.", e);
                 this.finish();
             }
-            
+
             try {
                 while (running && !shutdown && sock != null) {
 
@@ -1111,20 +1120,20 @@ public class QuorumCnxManager {
                 running = false;
             }
         }
-        
+
         /**
          * Shuts down this worker
-         * 
+         *
          * @return boolean  Value of variable running
          */
         synchronized boolean finish() {
             if(!running){
                 /*
-                 * Avoids running finish() twice. 
+                 * Avoids running finish() twice.
                  */
                 return running;
             }
-            running = false;            
+            running = false;
 
             this.interrupt();
             threadCnt.decrementAndGet();