You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by cn...@apache.org on 2016/06/23 21:45:51 UTC

svn commit: r1750026 - in /zookeeper/branches/branch-3.5: ./ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/

Author: cnauroth
Date: Thu Jun 23 21:45:50 2016
New Revision: 1750026

URL: http://svn.apache.org/viewvc?rev=1750026&view=rev
Log:
ZOOKEEPER-2366: Reconfiguration of client port causes a socket leak. (fpj via cnauroth)

Modified:
    zookeeper/branches/branch-3.5/CHANGES.txt
    zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
    zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
    zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java
    zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java
    zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/ReconfigTest.java

Modified: zookeeper/branches/branch-3.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/CHANGES.txt?rev=1750026&r1=1750025&r2=1750026&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.5/CHANGES.txt Thu Jun 23 21:45:50 2016
@@ -165,6 +165,9 @@ BUGFIXES:
   ZOOKEEPER-2380: Deadlock between leader shutdown and forwarding ACK to the
   leader (Arshad Mohammad via cnauroth)
 
+  ZOOKEEPER-2366: Reconfiguration of client port causes a socket leak.
+  (fpj via cnauroth)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-2270: Allow MBeanRegistry to be overridden for better unit tests

Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java?rev=1750026&r1=1750025&r2=1750026&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java (original)
+++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java Thu Jun 23 21:45:50 2016
@@ -688,31 +688,42 @@ public class NIOServerCnxnFactory extend
         ss.configureBlocking(false);
         acceptThread = new AcceptThread(ss, addr, selectorThreads);
     }
-   
+
+    private void tryClose(ServerSocketChannel s) {
+        try {
+            s.close();
+        } catch (IOException sse) {
+            LOG.error("Error while closing server socket.", sse);
+        }
+    }
+
     @Override
-    public void reconfigure(InetSocketAddress addr){
+    public void reconfigure(InetSocketAddress addr) {
         ServerSocketChannel oldSS = ss;        
         try {
-           this.ss = ServerSocketChannel.open();
-           ss.socket().setReuseAddress(true);
-           LOG.info("binding to port " + addr);
-           ss.socket().bind(addr);
-           ss.configureBlocking(false);
-           acceptThread.setReconfiguring();
-           oldSS.close();           
-           acceptThread.wakeupSelector();
-           try {
-			  acceptThread.join();
-		   } catch (InterruptedException e) {
-			   LOG.error("Error joining old acceptThread when reconfiguring client port " + e.getMessage());
-		   }
-           acceptThread = new AcceptThread(ss, addr, selectorThreads);
-           acceptThread.start();
+            this.ss = ServerSocketChannel.open();
+            ss.socket().setReuseAddress(true);
+            LOG.info("binding to port " + addr);
+            ss.socket().bind(addr);
+            ss.configureBlocking(false);
+            acceptThread.setReconfiguring();
+            tryClose(oldSS);
+            acceptThread.wakeupSelector();
+            try {
+                acceptThread.join();
+            } catch (InterruptedException e) {
+                LOG.error("Error joining old acceptThread when reconfiguring client port {}",
+                            e.getMessage());
+                Thread.currentThread().interrupt();
+            }
+            acceptThread = new AcceptThread(ss, addr, selectorThreads);
+            acceptThread.start();
         } catch(IOException e) {
-           LOG.error("Error reconfiguring client port to " + addr + " " + e.getMessage());
+            LOG.error("Error reconfiguring client port to {} {}", addr, e.getMessage());
+            tryClose(oldSS);
         }
     }
-    
+
     /** {@inheritDoc} */
     public int getMaxClientCnxnsPerHost() {
         return maxClientCnxns;

Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java?rev=1750026&r1=1750025&r2=1750026&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java (original)
+++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java Thu Jun 23 21:45:50 2016
@@ -488,13 +488,17 @@ public class NettyServerCnxnFactory exte
         parentChannel = bootstrap.bind(localAddress);
     }
     
-    public void reconfigure(InetSocketAddress addr) 
-    {  
+    public void reconfigure(InetSocketAddress addr) {
        Channel oldChannel = parentChannel;
-       LOG.info("binding to port " + addr);
-        parentChannel = bootstrap.bind(addr);
-        localAddress = addr;  
-        oldChannel.close();
+       try {
+           LOG.info("binding to port {}", addr);
+           parentChannel = bootstrap.bind(addr);
+           localAddress = addr;
+       } catch (Exception e) {
+           LOG.error("Error while reconfiguring", e);
+       } finally {
+           oldChannel.close();
+       }
     }
     
     @Override

Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1750026&r1=1750025&r2=1750026&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Thu Jun 23 21:45:50 2016
@@ -710,7 +710,7 @@ public class Leader {
        // that different operations wait for different sets of acks, and we still want to enforce
        // that they are committed in order. Currently we only permit one outstanding reconfiguration
        // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
-       // pending all wait for a quorum of old and new config, so its not possible to get enough acks
+       // pending all wait for a quorum of old and new config, so it's not possible to get enough acks
        // for an operation without getting enough acks for preceding ops. But in the future if multiple
        // concurrent reconfigs are allowed, this can happen.
        if (outstandingProposals.containsKey(zxid - 1)) return false;
@@ -751,7 +751,7 @@ public class Leader {
             QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier();
        
             self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
-       
+
             if (designatedLeader != self.getId()) {
                 allowedToCommit = false;
             }
@@ -1261,7 +1261,7 @@ public class Leader {
         QuorumVerifier newQV = self.getLastSeenQuorumVerifier();
         
         Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid());                                         
-        
+
         self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
         if (designatedLeader != self.getId()) {
             allowedToCommit = false;

Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1750026&r1=1750025&r2=1750026&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Thu Jun 23 21:45:50 2016
@@ -1728,7 +1728,7 @@ public class QuorumPeer extends ZooKeepe
         writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
     }
    
-    public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE){
+    public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE) {
        InetSocketAddress oldClientAddr = getClientAddress();
 
        // update last committed quorum verifier, write the new config to disk
@@ -1756,8 +1756,8 @@ public class QuorumPeer extends ZooKeepe
                cnxnFactory.reconfigure(myNewQS.clientAddr);
                updateThreadName();
            }
-           
-            boolean roleChange = updateLearnerType(qv);
+
+           boolean roleChange = updateLearnerType(qv);
            boolean leaderChange = false;
            if (suggestedLeaderId != null) {
                // zxid should be non-null too
@@ -1875,7 +1875,9 @@ public class QuorumPeer extends ZooKeepe
     }
 
     private void updateThreadName() {
-        String plain = cnxnFactory != null ? cnxnFactory.getLocalAddress().toString() : "disabled";
+        String plain = cnxnFactory != null ?
+                cnxnFactory.getLocalAddress() != null ?
+                        cnxnFactory.getLocalAddress().toString() : "disabled" : "disabled";
         String secure = secureCnxnFactory != null ? secureCnxnFactory.getLocalAddress().toString() : "disabled";
         setName(String.format("QuorumPeer[myid=%d](plain=%s)(secure=%s)", getId(), plain, secure));
     }

Modified: zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java?rev=1750026&r1=1750025&r2=1750026&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java (original)
+++ zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java Thu Jun 23 21:45:50 2016
@@ -30,6 +30,7 @@ import org.junit.runners.Suite;
         ClientTest.class,
         FourLetterWordsTest.class,
         NullDataTest.class,
+        ReconfigTest.class,
         SessionTest.class,
         WatcherTest.class
 })

Modified: zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java?rev=1750026&r1=1750025&r2=1750026&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java (original)
+++ zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java Thu Jun 23 21:45:50 2016
@@ -30,6 +30,7 @@ import org.junit.runners.Suite;
         ClientTest.class,
         FourLetterWordsTest.class,
         NullDataTest.class,
+        ReconfigTest.class,
         SessionTest.class,
         WatcherTest.class
         })

Modified: zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/ReconfigTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/ReconfigTest.java?rev=1750026&r1=1750025&r2=1750026&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/ReconfigTest.java (original)
+++ zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/ReconfigTest.java Thu Jun 23 21:45:50 2016
@@ -18,13 +18,18 @@
 
 package org.apache.zookeeper.test;
 
+import static java.net.InetAddress.getLoopbackAddress;
+
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.PortAssignment;
@@ -33,18 +38,16 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.AsyncCallback.DataCallback;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.jmx.CommonNames;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.QuorumStats;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-import org.junit.Assert;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -650,6 +653,89 @@ public class ReconfigTest extends ZKTest
         closeAllHandles(zkArr);
     }
 
+    @Test
+    public void testPortChangeToBlockedPortFollower() throws Exception {
+        testPortChangeToBlockedPort(false);
+    }
+    @Test
+    public void testPortChangeToBlockedPortLeader() throws Exception {
+        testPortChangeToBlockedPort(true);
+    }
+
+    private void testPortChangeToBlockedPort(boolean testLeader) throws Exception {
+        qu = new QuorumUtil(1); // create 3 servers
+        qu.disableJMXTest = true;
+        qu.startAll();
+        ZooKeeper[] zkArr = createHandles(qu);
+
+        List<String> joiningServers = new ArrayList<String>();
+
+        int leaderIndex = getLeaderId(qu);
+        int followerIndex = leaderIndex == 1 ? 2 : 1;
+        int serverIndex = testLeader ? leaderIndex : followerIndex;
+        int reconfigIndex = testLeader ? followerIndex : leaderIndex;
+
+        // modify server's client port
+        int quorumPort = qu.getPeer(serverIndex).peer.getQuorumAddress().getPort();
+        int electionPort = qu.getPeer(serverIndex).peer.getElectionAddress().getPort();
+        int oldClientPort = qu.getPeer(serverIndex).peer.getClientPort();
+        int newClientPort = PortAssignment.unique();
+
+        try(ServerSocket ss = new ServerSocket()) {
+            ss.bind(new InetSocketAddress(getLoopbackAddress(), newClientPort));
+
+            joiningServers.add("server." + serverIndex + "=localhost:" + quorumPort
+                        + ":" + electionPort + ":participant;localhost:" + newClientPort);
+
+            // create a /test znode and check that read/write works before
+            // any reconfig is invoked
+            testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]);
+
+            // Reconfigure
+            reconfig(zkArr[reconfigIndex], joiningServers, null, null, -1);
+            Thread.sleep(1000);
+
+            // The follower reconfiguration will have failed
+            zkArr[serverIndex].close();
+            zkArr[serverIndex] = new ZooKeeper("127.0.0.1:"
+                    + newClientPort,
+                    ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+                        public void process(WatchedEvent event) {}});
+
+            try {
+                Thread.sleep(1000);
+                zkArr[serverIndex].setData("/test", "teststr".getBytes(), -1);
+                Assert.fail("New client connected to new client port!");
+            } catch (KeeperException.ConnectionLossException e) {
+                // Exception is expected
+            }
+
+            //The old port should be clear at this stage
+
+            try (ServerSocket ss2 = new ServerSocket()) {
+                ss2.bind(new InetSocketAddress(getLoopbackAddress(), oldClientPort));
+            }
+
+            // Move back to the old port
+            joiningServers.clear();
+            joiningServers.add("server." + serverIndex + "=localhost:" + quorumPort
+                    + ":" + electionPort + ":participant;localhost:" + oldClientPort);
+
+            reconfig(zkArr[reconfigIndex], joiningServers, null, null, -1);
+
+            zkArr[serverIndex].close();
+            zkArr[serverIndex] = new ZooKeeper("127.0.0.1:"
+                    + oldClientPort,
+                    ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+                        public void process(WatchedEvent event) {}});
+
+            testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]);
+            testServerHasConfig(zkArr[serverIndex], joiningServers, null);
+            Assert.assertEquals(oldClientPort, qu.getPeer(serverIndex).peer.getClientPort());
+        }
+        closeAllHandles(zkArr);
+    }
+
     @Test
     public void testUnspecifiedClientAddress() throws Exception {
     	int[] ports = new int[3];