You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by cn...@apache.org on 2016/06/23 21:45:51 UTC
svn commit: r1750026 - in /zookeeper/branches/branch-3.5: ./
src/java/main/org/apache/zookeeper/server/
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/test/
Author: cnauroth
Date: Thu Jun 23 21:45:50 2016
New Revision: 1750026
URL: http://svn.apache.org/viewvc?rev=1750026&view=rev
Log:
ZOOKEEPER-2366: Reconfiguration of client port causes a socket leak. (fpj via cnauroth)
Modified:
zookeeper/branches/branch-3.5/CHANGES.txt
zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java
zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java
zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/ReconfigTest.java
Modified: zookeeper/branches/branch-3.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/CHANGES.txt?rev=1750026&r1=1750025&r2=1750026&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.5/CHANGES.txt Thu Jun 23 21:45:50 2016
@@ -165,6 +165,9 @@ BUGFIXES:
ZOOKEEPER-2380: Deadlock between leader shutdown and forwarding ACK to the
leader (Arshad Mohammad via cnauroth)
+ ZOOKEEPER-2366: Reconfiguration of client port causes a socket leak.
+ (fpj via cnauroth)
+
IMPROVEMENTS:
ZOOKEEPER-2270: Allow MBeanRegistry to be overridden for better unit tests
Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java?rev=1750026&r1=1750025&r2=1750026&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java (original)
+++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java Thu Jun 23 21:45:50 2016
@@ -688,31 +688,42 @@ public class NIOServerCnxnFactory extend
ss.configureBlocking(false);
acceptThread = new AcceptThread(ss, addr, selectorThreads);
}
-
+
+ private void tryClose(ServerSocketChannel s) {
+ try {
+ s.close();
+ } catch (IOException sse) {
+ LOG.error("Error while closing server socket.", sse);
+ }
+ }
+
@Override
- public void reconfigure(InetSocketAddress addr){
+ public void reconfigure(InetSocketAddress addr) {
ServerSocketChannel oldSS = ss;
try {
- this.ss = ServerSocketChannel.open();
- ss.socket().setReuseAddress(true);
- LOG.info("binding to port " + addr);
- ss.socket().bind(addr);
- ss.configureBlocking(false);
- acceptThread.setReconfiguring();
- oldSS.close();
- acceptThread.wakeupSelector();
- try {
- acceptThread.join();
- } catch (InterruptedException e) {
- LOG.error("Error joining old acceptThread when reconfiguring client port " + e.getMessage());
- }
- acceptThread = new AcceptThread(ss, addr, selectorThreads);
- acceptThread.start();
+ this.ss = ServerSocketChannel.open();
+ ss.socket().setReuseAddress(true);
+ LOG.info("binding to port " + addr);
+ ss.socket().bind(addr);
+ ss.configureBlocking(false);
+ acceptThread.setReconfiguring();
+ tryClose(oldSS);
+ acceptThread.wakeupSelector();
+ try {
+ acceptThread.join();
+ } catch (InterruptedException e) {
+ LOG.error("Error joining old acceptThread when reconfiguring client port {}",
+ e.getMessage());
+ Thread.currentThread().interrupt();
+ }
+ acceptThread = new AcceptThread(ss, addr, selectorThreads);
+ acceptThread.start();
} catch(IOException e) {
- LOG.error("Error reconfiguring client port to " + addr + " " + e.getMessage());
+ LOG.error("Error reconfiguring client port to {} {}", addr, e.getMessage());
+ tryClose(oldSS);
}
}
-
+
/** {@inheritDoc} */
public int getMaxClientCnxnsPerHost() {
return maxClientCnxns;
Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java?rev=1750026&r1=1750025&r2=1750026&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java (original)
+++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java Thu Jun 23 21:45:50 2016
@@ -488,13 +488,17 @@ public class NettyServerCnxnFactory exte
parentChannel = bootstrap.bind(localAddress);
}
- public void reconfigure(InetSocketAddress addr)
- {
+ public void reconfigure(InetSocketAddress addr) {
Channel oldChannel = parentChannel;
- LOG.info("binding to port " + addr);
- parentChannel = bootstrap.bind(addr);
- localAddress = addr;
- oldChannel.close();
+ try {
+ LOG.info("binding to port {}", addr);
+ parentChannel = bootstrap.bind(addr);
+ localAddress = addr;
+ } catch (Exception e) {
+ LOG.error("Error while reconfiguring", e);
+ } finally {
+ oldChannel.close();
+ }
}
@Override
Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1750026&r1=1750025&r2=1750026&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Thu Jun 23 21:45:50 2016
@@ -710,7 +710,7 @@ public class Leader {
// that different operations wait for different sets of acks, and we still want to enforce
// that they are committed in order. Currently we only permit one outstanding reconfiguration
// such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
- // pending all wait for a quorum of old and new config, so its not possible to get enough acks
+ // pending all wait for a quorum of old and new config, so it's not possible to get enough acks
// for an operation without getting enough acks for preceding ops. But in the future if multiple
// concurrent reconfigs are allowed, this can happen.
if (outstandingProposals.containsKey(zxid - 1)) return false;
@@ -751,7 +751,7 @@ public class Leader {
QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier();
self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
-
+
if (designatedLeader != self.getId()) {
allowedToCommit = false;
}
@@ -1261,7 +1261,7 @@ public class Leader {
QuorumVerifier newQV = self.getLastSeenQuorumVerifier();
Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid());
-
+
self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
if (designatedLeader != self.getId()) {
allowedToCommit = false;
Modified: zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1750026&r1=1750025&r2=1750026&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/branches/branch-3.5/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Thu Jun 23 21:45:50 2016
@@ -1728,7 +1728,7 @@ public class QuorumPeer extends ZooKeepe
writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
}
- public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE){
+ public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE) {
InetSocketAddress oldClientAddr = getClientAddress();
// update last committed quorum verifier, write the new config to disk
@@ -1756,8 +1756,8 @@ public class QuorumPeer extends ZooKeepe
cnxnFactory.reconfigure(myNewQS.clientAddr);
updateThreadName();
}
-
- boolean roleChange = updateLearnerType(qv);
+
+ boolean roleChange = updateLearnerType(qv);
boolean leaderChange = false;
if (suggestedLeaderId != null) {
// zxid should be non-null too
@@ -1875,7 +1875,9 @@ public class QuorumPeer extends ZooKeepe
}
private void updateThreadName() {
- String plain = cnxnFactory != null ? cnxnFactory.getLocalAddress().toString() : "disabled";
+ String plain = cnxnFactory != null ?
+ cnxnFactory.getLocalAddress() != null ?
+ cnxnFactory.getLocalAddress().toString() : "disabled" : "disabled";
String secure = secureCnxnFactory != null ? secureCnxnFactory.getLocalAddress().toString() : "disabled";
setName(String.format("QuorumPeer[myid=%d](plain=%s)(secure=%s)", getId(), plain, secure));
}
Modified: zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java?rev=1750026&r1=1750025&r2=1750026&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java (original)
+++ zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NettyNettySuiteTest.java Thu Jun 23 21:45:50 2016
@@ -30,6 +30,7 @@ import org.junit.runners.Suite;
ClientTest.class,
FourLetterWordsTest.class,
NullDataTest.class,
+ ReconfigTest.class,
SessionTest.class,
WatcherTest.class
})
Modified: zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java?rev=1750026&r1=1750025&r2=1750026&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java (original)
+++ zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java Thu Jun 23 21:45:50 2016
@@ -30,6 +30,7 @@ import org.junit.runners.Suite;
ClientTest.class,
FourLetterWordsTest.class,
NullDataTest.class,
+ ReconfigTest.class,
SessionTest.class,
WatcherTest.class
})
Modified: zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/ReconfigTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/ReconfigTest.java?rev=1750026&r1=1750025&r2=1750026&view=diff
==============================================================================
--- zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/ReconfigTest.java (original)
+++ zookeeper/branches/branch-3.5/src/java/test/org/apache/zookeeper/test/ReconfigTest.java Thu Jun 23 21:45:50 2016
@@ -18,13 +18,18 @@
package org.apache.zookeeper.test;
+import static java.net.InetAddress.getLoopbackAddress;
+
import java.io.IOException;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
@@ -33,18 +38,16 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.jmx.CommonNames;
import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.QuorumStats;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-import org.junit.Assert;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -650,6 +653,89 @@ public class ReconfigTest extends ZKTest
closeAllHandles(zkArr);
}
+ @Test
+ public void testPortChangeToBlockedPortFollower() throws Exception {
+ testPortChangeToBlockedPort(false);
+ }
+ @Test
+ public void testPortChangeToBlockedPortLeader() throws Exception {
+ testPortChangeToBlockedPort(true);
+ }
+
+ private void testPortChangeToBlockedPort(boolean testLeader) throws Exception {
+ qu = new QuorumUtil(1); // create 3 servers
+ qu.disableJMXTest = true;
+ qu.startAll();
+ ZooKeeper[] zkArr = createHandles(qu);
+
+ List<String> joiningServers = new ArrayList<String>();
+
+ int leaderIndex = getLeaderId(qu);
+ int followerIndex = leaderIndex == 1 ? 2 : 1;
+ int serverIndex = testLeader ? leaderIndex : followerIndex;
+ int reconfigIndex = testLeader ? followerIndex : leaderIndex;
+
+ // modify server's client port
+ int quorumPort = qu.getPeer(serverIndex).peer.getQuorumAddress().getPort();
+ int electionPort = qu.getPeer(serverIndex).peer.getElectionAddress().getPort();
+ int oldClientPort = qu.getPeer(serverIndex).peer.getClientPort();
+ int newClientPort = PortAssignment.unique();
+
+ try(ServerSocket ss = new ServerSocket()) {
+ ss.bind(new InetSocketAddress(getLoopbackAddress(), newClientPort));
+
+ joiningServers.add("server." + serverIndex + "=localhost:" + quorumPort
+ + ":" + electionPort + ":participant;localhost:" + newClientPort);
+
+ // create a /test znode and check that read/write works before
+ // any reconfig is invoked
+ testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]);
+
+ // Reconfigure
+ reconfig(zkArr[reconfigIndex], joiningServers, null, null, -1);
+ Thread.sleep(1000);
+
+ // The follower reconfiguration will have failed
+ zkArr[serverIndex].close();
+ zkArr[serverIndex] = new ZooKeeper("127.0.0.1:"
+ + newClientPort,
+ ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+ public void process(WatchedEvent event) {}});
+
+ try {
+ Thread.sleep(1000);
+ zkArr[serverIndex].setData("/test", "teststr".getBytes(), -1);
+ Assert.fail("New client connected to new client port!");
+ } catch (KeeperException.ConnectionLossException e) {
+ // Exception is expected
+ }
+
+ //The old port should be clear at this stage
+
+ try (ServerSocket ss2 = new ServerSocket()) {
+ ss2.bind(new InetSocketAddress(getLoopbackAddress(), oldClientPort));
+ }
+
+ // Move back to the old port
+ joiningServers.clear();
+ joiningServers.add("server." + serverIndex + "=localhost:" + quorumPort
+ + ":" + electionPort + ":participant;localhost:" + oldClientPort);
+
+ reconfig(zkArr[reconfigIndex], joiningServers, null, null, -1);
+
+ zkArr[serverIndex].close();
+ zkArr[serverIndex] = new ZooKeeper("127.0.0.1:"
+ + oldClientPort,
+ ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+ public void process(WatchedEvent event) {}});
+
+ testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]);
+ testServerHasConfig(zkArr[serverIndex], joiningServers, null);
+ Assert.assertEquals(oldClientPort, qu.getPeer(serverIndex).peer.getClientPort());
+ }
+ closeAllHandles(zkArr);
+ }
+
@Test
public void testUnspecifiedClientAddress() throws Exception {
int[] ports = new int[3];