You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by mi...@apache.org on 2014/07/17 22:56:43 UTC
svn commit: r1611474 - in /zookeeper/trunk: ./ src/c/tests/
src/java/main/org/apache/zookeeper/
src/java/main/org/apache/zookeeper/client/
src/java/test/org/apache/zookeeper/test/
Author: michim
Date: Thu Jul 17 20:56:43 2014
New Revision: 1611474
URL: http://svn.apache.org/r1611474
Log:
ZOOKEEPER-1683. ZooKeeper client NPE when updating server list on disconnected client (shralex via michim)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/c/tests/TestReconfig.cc
zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1611474&r1=1611473&r2=1611474&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Thu Jul 17 20:56:43 2014
@@ -696,6 +696,9 @@ BUGFIXES:
ZOOKEEPER-1966. VS and line breaks (Orion Hodson via fpj)
+ ZOOKEEPER-1683. ZooKeeper client NPE when updating server list on disconnected
+ client (shralex via michim)
+
IMPROVEMENTS:
ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,
Modified: zookeeper/trunk/src/c/tests/TestReconfig.cc
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/tests/TestReconfig.cc?rev=1611474&r1=1611473&r2=1611474&view=diff
==============================================================================
--- zookeeper/trunk/src/c/tests/TestReconfig.cc (original)
+++ zookeeper/trunk/src/c/tests/TestReconfig.cc Thu Jul 17 20:56:43 2014
@@ -470,11 +470,13 @@ public:
// Assert next server is in the 'new' list
size_t found = newComing.find(next);
- CPPUNIT_ASSERT(found != string::npos);
+ CPPUNIT_ASSERT_MESSAGE(next + " not in newComing list",
+ found != string::npos);
// Assert not in seen list then append
found = seen.find(next);
- CPPUNIT_ASSERT(found == string::npos);
+ CPPUNIT_ASSERT_MESSAGE(next + " in seen list",
+ found == string::npos);
seen += found + ", ";
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java?rev=1611474&r1=1611473&r2=1611474&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java Thu Jul 17 20:56:43 2014
@@ -377,7 +377,12 @@ public class ClientCnxnSocketNIO extends
@Override
void testableCloseSocket() throws IOException {
LOG.info("testableCloseSocket() called");
- ((SocketChannel) sockKey.channel()).socket().close();
+ // sockKey may be concurrently accessed by multiple
+ // threads. We use tmp here to avoid a race condition
+ SelectionKey tmp = sockKey;
+ if (tmp!=null) {
+ ((SocketChannel) tmp.channel()).socket().close();
+ }
}
@Override
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java?rev=1611474&r1=1611473&r2=1611474&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/client/StaticHostProvider.java Thu Jul 17 20:56:43 2014
@@ -152,8 +152,9 @@ public final class StaticHostProvider im
@Override
- public boolean updateServerList(Collection<InetSocketAddress> serverAddresses,
- InetSocketAddress currentHost) {
+ public synchronized boolean updateServerList(
+ Collection<InetSocketAddress> serverAddresses,
+ InetSocketAddress currentHost) {
// Resolve server addresses and shuffle them
List<InetSocketAddress> resolvedList = resolveAndShuffle(serverAddresses);
if (resolvedList.isEmpty()) {
@@ -162,74 +163,106 @@ public final class StaticHostProvider im
}
// Check if client's current server is in the new list of servers
boolean myServerInNewConfig = false;
+
+ InetSocketAddress myServer = currentHost;
+
+ // choose "current" server according to the client rebalancing algorithm
+ if (reconfigMode) {
+ myServer = next(0);
+ }
+
+ // if the client is not currently connected to any server
+ if (myServer == null) {
+ // reconfigMode = false (next shouldn't return null).
+ if (lastIndex >= 0) {
+ // take the last server to which we were connected
+ myServer = this.serverAddresses.get(lastIndex);
+ } else {
+ // take the first server on the list
+ myServer = this.serverAddresses.get(0);
+ }
+ }
+
for (InetSocketAddress addr : resolvedList) {
- if (addr.getPort() == currentHost.getPort() &&
- ((addr.getAddress()!=null && currentHost.getAddress()!=null &&
- addr.getAddress().equals(currentHost.getAddress()))
- || addr.getHostName().equals(currentHost.getHostName()))) {
- myServerInNewConfig = true;
- break;
- }
+ if (addr.getPort() == myServer.getPort()
+ && ((addr.getAddress() != null
+ && myServer.getAddress() != null && addr
+ .getAddress().equals(myServer.getAddress())) || addr
+ .getHostName().equals(myServer.getHostName()))) {
+ myServerInNewConfig = true;
+ break;
+ }
}
- synchronized(this) {
- reconfigMode = true;
+ reconfigMode = true;
- newServers.clear();
- oldServers.clear();
- // Divide the new servers into oldServers that were in the previous list
- // and newServers that were not in the previous list
- for (InetSocketAddress resolvedAddress : resolvedList) {
- if (this.serverAddresses.contains(resolvedAddress)) {
- oldServers.add(resolvedAddress);
- } else {
- newServers.add(resolvedAddress);
- }
- }
+ newServers.clear();
+ oldServers.clear();
+ // Divide the new servers into oldServers that were in the previous list
+ // and newServers that were not in the previous list
+ for (InetSocketAddress resolvedAddress : resolvedList) {
+ if (this.serverAddresses.contains(resolvedAddress)) {
+ oldServers.add(resolvedAddress);
+ } else {
+ newServers.add(resolvedAddress);
+ }
+ }
- int numOld = oldServers.size();
- int numNew = newServers.size();
+ int numOld = oldServers.size();
+ int numNew = newServers.size();
- // number of servers increased
- if (numOld + numNew > this.serverAddresses.size()) {
- if (myServerInNewConfig) {
- // my server is in new config, but load should be decreased.
- // Need to decide if this client
- // is moving to one of the new servers
- if (sourceOfRandomness.nextFloat() <= (1 - ((float) this.serverAddresses
- .size()) / (numOld + numNew))) {
- pNew = 1;
- pOld = 0;
- } else {
- // do nothing special - stay with the current server
- reconfigMode = false;
- }
- } else {
- // my server is not in new config, and load on old servers must
- // be decreased, so connect to
- // one of the new servers
+ // number of servers increased
+ if (numOld + numNew > this.serverAddresses.size()) {
+ if (myServerInNewConfig) {
+ // my server is in new config, but load should be decreased.
+ // Need to decide if this client
+ // is moving to one of the new servers
+ if (sourceOfRandomness.nextFloat() <= (1 - ((float) this.serverAddresses
+ .size()) / (numOld + numNew))) {
pNew = 1;
pOld = 0;
- }
- } else { // number of servers stayed the same or decreased
- if (myServerInNewConfig) {
- // my server is in new config, and load should be increased, so
- // stay with this server and do nothing special
- reconfigMode = false;
} else {
- pOld = ((float) (numOld * (this.serverAddresses.size() - (numOld + numNew))))
- / ((numOld + numNew) * (this.serverAddresses.size() - numOld));
- pNew = 1 - pOld;
+ // do nothing special - stay with the current server
+ reconfigMode = false;
}
+ } else {
+ // my server is not in new config, and load on old servers must
+ // be decreased, so connect to
+ // one of the new servers
+ pNew = 1;
+ pOld = 0;
+ }
+ } else { // number of servers stayed the same or decreased
+ if (myServerInNewConfig) {
+ // my server is in new config, and load should be increased, so
+ // stay with this server and do nothing special
+ reconfigMode = false;
+ } else {
+ pOld = ((float) (numOld * (this.serverAddresses.size() - (numOld + numNew))))
+ / ((numOld + numNew) * (this.serverAddresses.size() - numOld));
+ pNew = 1 - pOld;
}
+ }
- this.serverAddresses = resolvedList;
- currentIndexOld = -1;
- currentIndexNew = -1;
+ if (!reconfigMode) {
+ currentIndex = resolvedList.indexOf(getServerAtCurrentIndex());
+ } else {
currentIndex = -1;
- lastIndex = -1;
- return reconfigMode;
}
+ this.serverAddresses = resolvedList;
+ currentIndexOld = -1;
+ currentIndexNew = -1;
+ lastIndex = currentIndex;
+ return reconfigMode;
+ }
+
+ public synchronized InetSocketAddress getServerAtIndex(int i) {
+ if (i < 0 || i >= serverAddresses.size()) return null;
+ return serverAddresses.get(i);
+ }
+
+ public synchronized InetSocketAddress getServerAtCurrentIndex() {
+ return getServerAtIndex(currentIndex);
}
public synchronized int size() {
@@ -279,7 +312,10 @@ public final class StaticHostProvider im
synchronized(this) {
if (reconfigMode) {
addr = nextHostInReconfigMode();
- if (addr != null) return addr;
+ if (addr != null) {
+ currentIndex = serverAddresses.indexOf(addr);
+ return addr;
+ }
//tried all servers and couldn't connect
reconfigMode = false;
needToSleep = (spinDelay > 0);
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java?rev=1611474&r1=1611473&r2=1611474&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StaticHostProviderTest.java Thu Jul 17 20:56:43 2014
@@ -123,21 +123,25 @@ public class StaticHostProviderTest exte
// Number of machines becomes smaller, my server is in the new cluster
boolean disconnectRequired = hostProvider.updateServerList(newList, myServer);
assertTrue(!disconnectRequired);
-
+ hostProvider.onConnected();
+
// Number of machines stayed the same, my server is in the new cluster
disconnectRequired = hostProvider.updateServerList(newList, myServer);
assertTrue(!disconnectRequired);
+ hostProvider.onConnected();
// Number of machines became smaller, my server is not in the new
// cluster
newList = getServerAddresses((byte) 2); // 10.10.10.2:1236, 10.10.10.1:1235
disconnectRequired = hostProvider.updateServerList(newList, myServer);
assertTrue(disconnectRequired);
+ hostProvider.onConnected();
// Number of machines stayed the same, my server is not in the new
// cluster
disconnectRequired = hostProvider.updateServerList(newList, myServer);
assertTrue(disconnectRequired);
+ hostProvider.onConnected();
// Number of machines increased, my server is not in the new cluster
newList = new ArrayList<InetSocketAddress>(3);
@@ -147,6 +151,7 @@ public class StaticHostProviderTest exte
myServer = new InetSocketAddress(InetAddress.getByAddress(new byte[]{10, 10, 10, 1}), 1235);
disconnectRequired = hostProvider.updateServerList(newList, myServer);
assertTrue(disconnectRequired);
+ hostProvider.onConnected();
// Number of machines increased, my server is in the new cluster
// Here whether to move or not depends on the difference of cluster
@@ -162,6 +167,7 @@ public class StaticHostProviderTest exte
if (disconnectRequired)
numDisconnects++;
}
+ hostProvider.onConnected();
// should be numClients/10 in expectation, we test that its numClients/10 +- slackPercent
assertTrue(numDisconnects < upperboundCPS(numClients, 10));
@@ -227,6 +233,7 @@ public class StaticHostProviderTest exte
}
assertEquals(first, hostProvider.next(0));
+ hostProvider.onConnected();
}
@Test
@@ -242,6 +249,7 @@ public class StaticHostProviderTest exte
hostProviderArray[i] = getHostProvider((byte) 9);
curHostForEachClient[i] = hostProviderArray[i].next(0);
numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+ hostProviderArray[i].onConnected();
}
for (int i = 0; i < 9; i++) {
@@ -257,6 +265,7 @@ public class StaticHostProviderTest exte
disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0);
numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+ hostProviderArray[i].onConnected();
}
for (int i = 0; i < 8; i++) {
@@ -273,6 +282,7 @@ public class StaticHostProviderTest exte
disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0);
numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+ hostProviderArray[i].onConnected();
}
for (int i = 0; i < 6; i++) {
@@ -295,6 +305,7 @@ public class StaticHostProviderTest exte
disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0);
numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+ hostProviderArray[i].onConnected();
}
assertTrue(numClientsPerHost[0] == 0);
@@ -312,6 +323,130 @@ public class StaticHostProviderTest exte
disconnectRequired = hostProviderArray[i].updateServerList(newList, curHostForEachClient[i]);
if (disconnectRequired) curHostForEachClient[i] = hostProviderArray[i].next(0);
numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+ hostProviderArray[i].onConnected();
+ }
+
+ for (int i = 0; i < 9; i++) {
+ assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 9));
+ assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 9));
+ }
+ }
+
+ @Test
+ public void testNoCurrentHostDuringNormalMode() throws UnknownHostException {
+ // Start with 9 servers and 10000 clients
+ boolean disconnectRequired;
+ StaticHostProvider[] hostProviderArray = new StaticHostProvider[numClients];
+ InetSocketAddress[] curHostForEachClient = new InetSocketAddress[numClients];
+ int[] numClientsPerHost = new int[9];
+
+ // initialization
+ for (int i = 0; i < numClients; i++) {
+ hostProviderArray[i] = getHostProvider((byte) 9);
+ if (i >= (numClients / 2)) {
+ curHostForEachClient[i] = hostProviderArray[i].next(0);
+ } else {
+ // its supposed to be the first server on serverList.
+ // we'll set it later, see below (*)
+ curHostForEachClient[i] = null;
+ }
+ }
+
+ // remove hosts 7 and 8 (the last two in a list of 9 hosts)
+ Collection<InetSocketAddress> newList = getServerAddresses((byte) 7);
+
+ for (int i = 0; i < numClients; i++) {
+ // tests the case currentHost == null && lastIndex == -1
+ // calls next for clients with index < numClients/2
+ disconnectRequired = hostProviderArray[i].updateServerList(newList,
+ curHostForEachClient[i]);
+ if (disconnectRequired)
+ curHostForEachClient[i] = hostProviderArray[i].next(0);
+ else if (curHostForEachClient[i] == null) {
+ // (*) setting it to what it should be
+ curHostForEachClient[i] = hostProviderArray[i]
+ .getServerAtIndex(0);
+ }
+ numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+ // sets lastIndex, resets reconfigMode
+ hostProviderArray[i].onConnected();
+ }
+
+ for (int i = 0; i < 7; i++) {
+ assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 7));
+ assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 7));
+ numClientsPerHost[i] = 0; // prepare for next test
+ }
+ assertTrue(numClientsPerHost[7] == 0);
+ assertTrue(numClientsPerHost[8] == 0);
+
+ // add back server 7
+ newList = getServerAddresses((byte) 8);
+
+ for (int i = 0; i < numClients; i++) {
+ InetSocketAddress myServer = (i < (numClients / 2)) ? null
+ : curHostForEachClient[i];
+ // tests the case currentHost == null && lastIndex >= 0
+ disconnectRequired = hostProviderArray[i].updateServerList(newList,
+ myServer);
+ if (disconnectRequired)
+ curHostForEachClient[i] = hostProviderArray[i].next(0);
+ numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+ hostProviderArray[i].onConnected();
+ }
+
+ for (int i = 0; i < 8; i++) {
+ assertTrue(numClientsPerHost[i] <= upperboundCPS(numClients, 8));
+ assertTrue(numClientsPerHost[i] >= lowerboundCPS(numClients, 8));
+ }
+ }
+
+ @Test
+ public void testReconfigDuringReconfigMode() throws UnknownHostException {
+ // Start with 9 servers and 10000 clients
+ boolean disconnectRequired;
+ StaticHostProvider[] hostProviderArray = new StaticHostProvider[numClients];
+ InetSocketAddress[] curHostForEachClient = new InetSocketAddress[numClients];
+ int[] numClientsPerHost = new int[9];
+
+ // initialization
+ for (int i = 0; i < numClients; i++) {
+ hostProviderArray[i] = getHostProvider((byte) 9);
+ curHostForEachClient[i] = hostProviderArray[i].next(0);
+ }
+
+ // remove hosts 7 and 8 (the last two in a list of 9 hosts)
+ Collection<InetSocketAddress> newList = getServerAddresses((byte) 7);
+
+ for (int i = 0; i < numClients; i++) {
+ // sets reconfigMode
+ hostProviderArray[i].updateServerList(newList,
+ curHostForEachClient[i]);
+ }
+
+ // add back servers 7 and 8 while still in reconfigMode (we didn't call
+ // next)
+ newList = getServerAddresses((byte) 9);
+
+ for (int i = 0; i < numClients; i++) {
+ InetSocketAddress myServer = (i < (numClients / 2)) ? null
+ : curHostForEachClient[i];
+ // for i < (numClients/2) this tests the case currentHost == null &&
+ // reconfigMode = true
+ // for i >= (numClients/2) this tests the case currentHost!=null &&
+ // reconfigMode = true
+ disconnectRequired = hostProviderArray[i].updateServerList(newList,
+ myServer);
+ if (disconnectRequired)
+ curHostForEachClient[i] = hostProviderArray[i].next(0);
+ else {
+ // currentIndex was set by the call to updateServerList, which
+ // called next
+ curHostForEachClient[i] = hostProviderArray[i]
+ .getServerAtCurrentIndex();
+ }
+ numClientsPerHost[curHostForEachClient[i].getPort() - 1235]++;
+ hostProviderArray[i].onConnected();
}
for (int i = 0; i < 9; i++) {