You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by mi...@apache.org on 2014/07/17 22:56:43 UTC

svn commit: r1611474 - in /zookeeper/trunk: ./ src/c/tests/ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/client/ src/java/test/org/apache/zookeeper/test/

Author: michim
Date: Thu Jul 17 20:56:43 2014
New Revision: 1611474

URL: http://svn.apache.org/r1611474
Log:
ZOOKEEPER-1683. ZooKeeper client NPE when updating server list on disconnected client (shralex via michim)

Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/c/tests/TestReconfig.cc
    zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1611474&r1=1611473&r2=1611474&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Thu Jul 17 20:56:43 2014
@@ -696,6 +696,9 @@ BUGFIXES:
 
   ZOOKEEPER-1966. VS and line breaks (Orion Hodson via fpj)
 
+  ZOOKEEPER-1683. ZooKeeper client NPE when updating server list on disconnected
+  client (shralex via michim)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,

Modified: zookeeper/trunk/src/c/tests/TestReconfig.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestReconfig.cc?rev=1611474&r1=1611473&r2=1611474&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/TestReconfig.cc (original)
+++ zookeeper/trunk/src/c/tests/TestReconfig.cc Thu Jul 17 20:56:43 2014
@@ -470,11 +470,13 @@ public:
 
             // Assert next server is in the 'new' list
             size_t found = newComing.find(next);
-            CPPUNIT_ASSERT(found != string::npos);
+            CPPUNIT_ASSERT_MESSAGE(next + " not in newComing list",
+                                   found != string::npos);
 
             // Assert not in seen list then append
             found = seen.find(next);
-            CPPUNIT_ASSERT(found == string::npos);
+            CPPUNIT_ASSERT_MESSAGE(next + " in seen list",
+                                   found == string::npos);
             seen += found + ", ";
         }
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java?rev=1611474&r1=1611473&r2=1611474&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java Thu Jul 17 20:56:43 2014
@@ -377,7 +377,12 @@ public class ClientCnxnSocketNIO extends
     @Override
     void testableCloseSocket() throws IOException {
         LOG.info("testableCloseSocket() called");
-        ((SocketChannel) sockKey.channel()).socket().close();
+        // sockKey may be concurrently accessed by multiple
+        // threads. We use tmp here to avoid a race condition
+        SelectionKey tmp = sockKey;
+        if (tmp!=null) {
+           ((SocketChannel) tmp.channel()).socket().close();
+        }
     }
 
     @Override

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java?rev=1611474&r1=1611473&r2=1611474&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java Thu Jul 17 20:56:43 2014
@@ -152,8 +152,9 @@ public final class StaticHostProvider im
 
 
     @Override
-    public boolean updateServerList(Collection<InetSocketAddress> serverAddresses, 
-        InetSocketAddress currentHost) {
+    public synchronized boolean updateServerList(
+            Collection<InetSocketAddress> serverAddresses,
+            InetSocketAddress currentHost) {
         // Resolve server addresses and shuffle them
         List<InetSocketAddress> resolvedList = resolveAndShuffle(serverAddresses);
         if (resolvedList.isEmpty()) {
@@ -162,74 +163,106 @@ public final class StaticHostProvider im
         }
         // Check if client's current server is in the new list of servers
         boolean myServerInNewConfig = false;
+
+        InetSocketAddress myServer = currentHost;
+
+        // choose "current" server according to the client rebalancing algorithm
+        if (reconfigMode) {
+            myServer = next(0);
+        }
+
+        // if the client is not currently connected to any server
+        if (myServer == null) {
+            // reconfigMode = false (next shouldn't return null).
+            if (lastIndex >= 0) {
+                // take the last server to which we were connected
+                myServer = this.serverAddresses.get(lastIndex);
+            } else {
+                // take the first server on the list
+                myServer = this.serverAddresses.get(0);
+            }
+        }
+
         for (InetSocketAddress addr : resolvedList) {
-            if (addr.getPort() == currentHost.getPort() &&
-                    ((addr.getAddress()!=null && currentHost.getAddress()!=null &&
-                      addr.getAddress().equals(currentHost.getAddress()))
-                     || addr.getHostName().equals(currentHost.getHostName()))) {
-                   myServerInNewConfig = true;
-                   break;
-               }
+            if (addr.getPort() == myServer.getPort()
+                    && ((addr.getAddress() != null
+                            && myServer.getAddress() != null && addr
+                            .getAddress().equals(myServer.getAddress())) || addr
+                            .getHostName().equals(myServer.getHostName()))) {
+                myServerInNewConfig = true;
+                break;
+            }
         }
 
-        synchronized(this) {
-            reconfigMode = true;
+        reconfigMode = true;
 
-            newServers.clear();
-            oldServers.clear();
-            // Divide the new servers into oldServers that were in the previous list
-            // and newServers that were not in the previous list
-            for (InetSocketAddress resolvedAddress : resolvedList) {                
-                if (this.serverAddresses.contains(resolvedAddress)) {
-                    oldServers.add(resolvedAddress);
-                } else {
-                    newServers.add(resolvedAddress);
-                }
-            }        
+        newServers.clear();
+        oldServers.clear();
+        // Divide the new servers into oldServers that were in the previous list
+        // and newServers that were not in the previous list
+        for (InetSocketAddress resolvedAddress : resolvedList) {
+            if (this.serverAddresses.contains(resolvedAddress)) {
+                oldServers.add(resolvedAddress);
+            } else {
+                newServers.add(resolvedAddress);
+            }
+        }
 
-            int numOld = oldServers.size();
-            int numNew = newServers.size();                        
+        int numOld = oldServers.size();
+        int numNew = newServers.size();
 
-            // number of servers increased
-            if (numOld + numNew > this.serverAddresses.size()) {
-                if (myServerInNewConfig) {
-                    // my server is in new config, but load should be decreased.
-                    // Need to decide if this client
-                    // is moving to one of the new servers
-                    if (sourceOfRandomness.nextFloat() <= (1 - ((float) this.serverAddresses
-                            .size()) / (numOld + numNew))) {
-                        pNew = 1;
-                        pOld = 0;
-                    } else {
-                        // do nothing special - stay with the current server
-                        reconfigMode = false;
-                    }
-                } else {
-                    // my server is not in new config, and load on old servers must
-                    // be decreased, so connect to
-                    // one of the new servers
+        // number of servers increased
+        if (numOld + numNew > this.serverAddresses.size()) {
+            if (myServerInNewConfig) {
+                // my server is in new config, but load should be decreased.
+                // Need to decide if this client
+                // is moving to one of the new servers
+                if (sourceOfRandomness.nextFloat() <= (1 - ((float) this.serverAddresses
+                        .size()) / (numOld + numNew))) {
                     pNew = 1;
                     pOld = 0;
-                }
-            } else { // number of servers stayed the same or decreased
-                if (myServerInNewConfig) {
-                    // my server is in new config, and load should be increased, so
-                    // stay with this server and do nothing special
-                    reconfigMode = false;
                 } else {
-                    pOld = ((float) (numOld * (this.serverAddresses.size() - (numOld + numNew))))
-                            / ((numOld + numNew) * (this.serverAddresses.size() - numOld));
-                    pNew = 1 - pOld;
+                    // do nothing special - stay with the current server
+                    reconfigMode = false;
                 }
+            } else {
+                // my server is not in new config, and load on old servers must
+                // be decreased, so connect to
+                // one of the new servers
+                pNew = 1;
+                pOld = 0;
+            }
+        } else { // number of servers stayed the same or decreased
+            if (myServerInNewConfig) {
+                // my server is in new config, and load should be increased, so
+                // stay with this server and do nothing special
+                reconfigMode = false;
+            } else {
+                pOld = ((float) (numOld * (this.serverAddresses.size() - (numOld + numNew))))
+                        / ((numOld + numNew) * (this.serverAddresses.size() - numOld));
+                pNew = 1 - pOld;
             }
+        }
 
-            this.serverAddresses = resolvedList;    
-            currentIndexOld = -1;
-            currentIndexNew = -1; 
+        if (!reconfigMode) {
+            currentIndex = resolvedList.indexOf(getServerAtCurrentIndex());
+        } else {
             currentIndex = -1;
-            lastIndex = -1;                
-            return reconfigMode;
         }
+        this.serverAddresses = resolvedList;
+        currentIndexOld = -1;
+        currentIndexNew = -1;
+        lastIndex = currentIndex;
+        return reconfigMode;
+    }
+
+    public synchronized InetSocketAddress getServerAtIndex(int i) {
+    	if (i < 0 || i >= serverAddresses.size()) return null;
+    	return serverAddresses.get(i);
+    }
+    
+    public synchronized InetSocketAddress getServerAtCurrentIndex() {
+    	return getServerAtIndex(currentIndex);
     }
 
     public synchronized int size() {
@@ -279,7 +312,10 @@ public final class StaticHostProvider im
         synchronized(this) {
             if (reconfigMode) {
                 addr = nextHostInReconfigMode();
-                if (addr != null) return addr;                
+                if (addr != null) {
+                	currentIndex = serverAddresses.indexOf(addr);
+                	return addr;                
+                }
                 //tried all servers and couldn't connect
                 reconfigMode = false;
                 needToSleep = (spinDelay > 0);

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java?rev=1611474&r1=1611473&r2=1611474&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java Thu Jul 17 20:56:43 2014
@@ -123,21 +123,25 @@ public class StaticHostProviderTest exte
         // Number of machines becomes smaller, my server is in the new cluster
         boolean disconnectRequired = hostProvider.updateServerList(newList, myServer);
         assertTrue(!disconnectRequired);
-
+        hostProvider.onConnected();
+        
         // Number of machines stayed the same, my server is in the new cluster
         disconnectRequired = hostProvider.updateServerList(newList, myServer);
         assertTrue(!disconnectRequired);
+        hostProvider.onConnected();
 
         // Number of machines became smaller, my server is not in the new
         // cluster
         newList = getServerAddresses((byte) 2); // 10.10.10.2:1236, 10.10.10.1:1235
         disconnectRequired = hostProvider.updateServerList(newList, myServer);
         assertTrue(disconnectRequired);
+        hostProvider.onConnected();
 
         // Number of machines stayed the same, my server is not in the new
         // cluster
         disconnectRequired = hostProvider.updateServerList(newList, myServer);
         assertTrue(disconnectRequired);
+        hostProvider.onConnected();
 
         // Number of machines increased, my server is not in the new cluster
         newList = new ArrayList<InetSocketAddress>(3);
@@ -147,6 +151,7 @@ public class StaticHostProviderTest exte
         myServer = new InetSocketAddress(InetAddress.getByAddress(new byte[]{10, 10, 10, 1}), 1235);
         disconnectRequired = hostProvider.updateServerList(newList, myServer);
         assertTrue(disconnectRequired);
+        hostProvider.onConnected();
 
         // Number of machines increased, my server is in the new cluster
         // Here whether to move or not depends on the difference of cluster
@@ -162,6 +167,7 @@ public class StaticHostProviderTest exte
             if (disconnectRequired)
                 numDisconnects++;
         }
+        hostProvider.onConnected();
 
        // should be numClients/10 in expectation, we test that its numClients/10 +- slackPercent 
         assertTrue(numDisconnects < upperboundCPS(numClients, 10));
@@ -227,6 +233,7 @@ public class StaticHostProviderTest exte
         }
 
         assertEquals(first, hostProvider.next(0));
+        hostProvider.onConnected();
     }
 
     @Test
@@ -242,6 +249,7 @@ public class StaticHostProviderTest exte
             hostProviderArray[i] = getHostProvider((byte) 9);
             curHostForEachClient[i] = hostProviderArray[i].next(0);
             numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+            hostProviderArray[i].onConnected();
         }
 
         for (int i = 0; i < 9; i++) {
@@ -257,6 +265,7 @@ public class StaticHostProviderTest exte
             disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
             if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0);
             numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+            hostProviderArray[i].onConnected();
         }
 
         for (int i = 0; i < 8; i++) {
@@ -273,6 +282,7 @@ public class StaticHostProviderTest exte
             disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
             if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0);
             numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+            hostProviderArray[i].onConnected();
         }
 
         for (int i = 0; i < 6; i++) {
@@ -295,6 +305,7 @@ public class StaticHostProviderTest exte
             disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
             if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0);
             numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+            hostProviderArray[i].onConnected();
         }
 
         assertTrue(numClientsPerHost[0] == 0);
@@ -312,6 +323,130 @@ public class StaticHostProviderTest exte
             disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
             if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0);
             numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+            hostProviderArray[i].onConnected();
+        }
+
+        for (int i = 0; i < 9; i++) {
+            assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 9));
+            assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 9));
+        }
+    }
+
+    @Test
+    public void testNoCurrentHostDuringNormalMode() throws UnknownHostException {
+        // Start with 9 servers and 10000 clients
+        boolean disconnectRequired;
+        StaticHostProvider[] hostProviderArray = new StaticHostProvider[numClients];
+        InetSocketAddress[] curHostForEachClient = new InetSocketAddress[numClients];
+        int[] numClientsPerHost = new int[9];
+
+        // initialization
+        for (int i = 0; i < numClients; i++) {
+            hostProviderArray[i] = getHostProvider((byte) 9);
+            if (i >= (numClients / 2)) {
+                curHostForEachClient[i] = hostProviderArray[i].next(0);
+            } else {
+                // its supposed to be the first server on serverList.
+                // we'll set it later, see below (*)
+                curHostForEachClient[i] = null;
+            }
+        }
+
+        // remove hosts 7 and 8 (the last two in a list of 9 hosts)
+        Collection<InetSocketAddress> newList = getServerAddresses((byte) 7);
+
+        for (int i = 0; i < numClients; i++) {
+            // tests the case currentHost == null && lastIndex == -1
+            // calls next for clients with index < numClients/2
+            disconnectRequired = hostProviderArray[i].updateServerList(newList,
+                    curHostForEachClient[i]);
+            if (disconnectRequired)
+                curHostForEachClient[i] = hostProviderArray[i].next(0);
+            else if (curHostForEachClient[i] == null) {
+                // (*) setting it to what it should be
+                curHostForEachClient[i] = hostProviderArray[i]
+                        .getServerAtIndex(0);
+            }
+            numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+            // sets lastIndex, resets reconfigMode
+            hostProviderArray[i].onConnected();
+        }
+
+        for (int i = 0; i < 7; i++) {
+            assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 7));
+            assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 7));
+            numClientsPerHost[i] = 0; // prepare for next test
+        }
+        assertTrue(numClientsPerHost[7] == 0);
+        assertTrue(numClientsPerHost[8] == 0);
+
+        // add back server 7
+        newList = getServerAddresses((byte) 8);
+
+        for (int i = 0; i < numClients; i++) {
+            InetSocketAddress myServer = (i < (numClients / 2)) ? null
+                    : curHostForEachClient[i];
+            // tests the case currentHost == null && lastIndex >= 0
+            disconnectRequired = hostProviderArray[i].updateServerList(newList,
+                    myServer);
+            if (disconnectRequired)
+                curHostForEachClient[i] = hostProviderArray[i].next(0);
+            numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+            hostProviderArray[i].onConnected();
+        }
+
+        for (int i = 0; i < 8; i++) {
+            assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 8));
+            assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 8));
+        }
+    }
+
+    @Test
+    public void testReconfigDuringReconfigMode() throws UnknownHostException {
+        // Start with 9 servers and 10000 clients
+        boolean disconnectRequired;
+        StaticHostProvider[] hostProviderArray = new StaticHostProvider[numClients];
+        InetSocketAddress[] curHostForEachClient = new InetSocketAddress[numClients];
+        int[] numClientsPerHost = new int[9];
+
+        // initialization
+        for (int i = 0; i < numClients; i++) {
+            hostProviderArray[i] = getHostProvider((byte) 9);
+            curHostForEachClient[i] = hostProviderArray[i].next(0);
+        }
+
+        // remove hosts 7 and 8 (the last two in a list of 9 hosts)
+        Collection<InetSocketAddress> newList = getServerAddresses((byte) 7);
+
+        for (int i = 0; i < numClients; i++) {
+            // sets reconfigMode
+            hostProviderArray[i].updateServerList(newList,
+                    curHostForEachClient[i]);
+        }
+
+        // add back servers 7 and 8 while still in reconfigMode (we didn't call
+        // next)
+        newList = getServerAddresses((byte) 9);
+
+        for (int i = 0; i < numClients; i++) {
+            InetSocketAddress myServer = (i < (numClients / 2)) ? null
+                    : curHostForEachClient[i];
+            // for i < (numClients/2) this tests the case currentHost == null &&
+            // reconfigMode = true
+            // for i >= (numClients/2) this tests the case currentHost!=null &&
+            // reconfigMode = true
+            disconnectRequired = hostProviderArray[i].updateServerList(newList,
+                    myServer);
+            if (disconnectRequired)
+                curHostForEachClient[i] = hostProviderArray[i].next(0);
+            else {
+                // currentIndex was set by the call to updateServerList, which
+                // called next
+                curHostForEachClient[i] = hostProviderArray[i]
+                        .getServerAtCurrentIndex();
+            }
+            numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+            hostProviderArray[i].onConnected();
         }
 
         for (int i = 0; i < 9; i++) {