You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by an...@apache.org on 2018/11/22 16:56:09 UTC
[1/2] zookeeper git commit: ZOOKEEPER-3152: Port ZK netty stack to
netty4
Repository: zookeeper
Updated Branches:
refs/heads/master 1507f67a0 -> caca06276
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
index 15f993c..7c51a12 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
@@ -23,6 +23,7 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.TestByteBufAllocator;
import org.apache.zookeeper.server.quorum.BufferStats;
import org.apache.zookeeper.test.ClientBase;
import org.junit.Assert;
@@ -31,9 +32,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertThat;
/**
@@ -48,9 +51,17 @@ public class NettyServerCnxnTest extends ClientBase {
public void setUp() throws Exception {
System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
"org.apache.zookeeper.server.NettyServerCnxnFactory");
+ NettyServerCnxnFactory.setTestAllocator(TestByteBufAllocator.getInstance());
super.setUp();
}
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ NettyServerCnxnFactory.clearTestAllocator();
+ TestByteBufAllocator.checkForLeaks();
+ }
+
/**
* Test verifies the channel closure - while closing the channel
* servercnxnfactory should remove all channel references to avoid
@@ -110,6 +121,66 @@ public class NettyServerCnxnTest extends ClientBase {
assertThat("Last client response size should be greater than 0 after client request was performed",
clientResponseStats.getLastBufferSize(), greaterThan(0));
+
+ byte[] contents = zk.getData("/a", null, null);
+ assertArrayEquals("unexpected data", "test".getBytes(), contents);
+ }
+ }
+
+ @Test
+ public void testServerSideThrottling() throws IOException, InterruptedException, KeeperException {
+ try (ZooKeeper zk = createClient()) {
+ BufferStats clientResponseStats = serverFactory.getZooKeeperServer().serverStats().getClientResponseStats();
+ assertThat("Last client response size should be initialized with INIT_VALUE",
+ clientResponseStats.getLastBufferSize(), equalTo(BufferStats.INIT_VALUE));
+
+ zk.create("/a", "test".getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ assertThat("Last client response size should be greater than 0 after client request was performed",
+ clientResponseStats.getLastBufferSize(), greaterThan(0));
+
+ for (final ServerCnxn cnxn : serverFactory.cnxns) {
+ final NettyServerCnxn nettyCnxn = ((NettyServerCnxn) cnxn);
+ // Disable receiving data for all open connections ...
+ nettyCnxn.disableRecv();
+ // ... then force a throttled read after 1 second (this puts the read into queuedBuffer) ...
+ nettyCnxn.getChannel().eventLoop().schedule(new Runnable() {
+ @Override
+ public void run() {
+ nettyCnxn.getChannel().read();
+ }
+ }, 1, TimeUnit.SECONDS);
+
+ // ... and finally disable throttling after 2 seconds.
+ nettyCnxn.getChannel().eventLoop().schedule(new Runnable() {
+ @Override
+ public void run() {
+ nettyCnxn.enableRecv();
+ }
+ }, 2, TimeUnit.SECONDS);
+ }
+
+ byte[] contents = zk.getData("/a", null, null);
+ assertArrayEquals("unexpected data", "test".getBytes(), contents);
+
+ // As above, but don't do the throttled read. Make the request bytes wait in the socket
+ // input buffer until after throttling is turned off. Need to make sure both modes work.
+ for (final ServerCnxn cnxn : serverFactory.cnxns) {
+ final NettyServerCnxn nettyCnxn = ((NettyServerCnxn) cnxn);
+ // Disable receiving data for all open connections ...
+ nettyCnxn.disableRecv();
+ // ... then disable throttling after 2 seconds.
+ nettyCnxn.getChannel().eventLoop().schedule(new Runnable() {
+ @Override
+ public void run() {
+ nettyCnxn.enableRecv();
+ }
+ }, 2, TimeUnit.SECONDS);
+ }
+
+ contents = zk.getData("/a", null, null);
+ assertArrayEquals("unexpected data", "test".getBytes(), contents);
}
}
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java
index 6373bb3..c337e3c 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientTest.java
@@ -855,6 +855,7 @@ public class ClientTest extends ClientBase {
// Sending a nonexisting opcode should cause the server to disconnect
Assert.assertTrue("failed to disconnect",
clientDisconnected.await(5000, TimeUnit.MILLISECONDS));
+ zk.close();
}
@Test
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/NettyNettySuiteBase.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/NettyNettySuiteBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/NettyNettySuiteBase.java
index 684d67a..bbcf869 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/NettyNettySuiteBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/NettyNettySuiteBase.java
@@ -22,7 +22,9 @@ import org.apache.zookeeper.ClientCnxnSocketNetty;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.server.NettyServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -46,4 +48,15 @@ public class NettyNettySuiteBase {
System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
System.clearProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
}
+
+ @Before
+ public void setUpTest() throws Exception {
+ TestByteBufAllocatorTestHelper.setTestAllocator(TestByteBufAllocator.getInstance());
+ }
+
+ @After
+ public void tearDownTest() throws Exception {
+ TestByteBufAllocatorTestHelper.clearTestAllocator();
+ TestByteBufAllocator.checkForLeaks();
+ }
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/NioNettySuiteBase.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/NioNettySuiteBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/NioNettySuiteBase.java
index 5725c17..836eaa0 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/NioNettySuiteBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/NioNettySuiteBase.java
@@ -20,7 +20,9 @@ package org.apache.zookeeper.test;
import org.apache.zookeeper.server.NettyServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -41,4 +43,15 @@ public class NioNettySuiteBase {
public static void tearDown() {
System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
}
+
+ @Before
+ public void setUpTest() throws Exception {
+ TestByteBufAllocatorTestHelper.setTestAllocator(TestByteBufAllocator.getInstance());
+ }
+
+ @After
+ public void tearDownTest() throws Exception {
+ TestByteBufAllocatorTestHelper.clearTestAllocator();
+ TestByteBufAllocator.checkForLeaks();
+ }
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
index 8d10dc9..7b39ab1 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReconfigTest.java
@@ -60,6 +60,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
.getLogger(ReconfigTest.class);
private QuorumUtil qu;
+ private ZooKeeper[] zkArr;
+ private ZooKeeperAdmin[] zkAdminArr;
@Before
public void setup() {
@@ -70,6 +72,7 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
@After
public void tearDown() throws Exception {
+ closeAllHandles(zkArr, zkAdminArr);
if (qu != null) {
qu.tearDown();
}
@@ -237,12 +240,16 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
}
public static void closeAllHandles(ZooKeeper[] zkArr, ZooKeeperAdmin[] zkAdminArr) throws InterruptedException {
- for (ZooKeeper zk : zkArr)
- if (zk != null)
- zk.close();
- for (ZooKeeperAdmin zkAdmin : zkAdminArr)
- if (zkAdmin != null)
- zkAdmin.close();
+ if (zkArr != null) {
+ for (ZooKeeper zk : zkArr)
+ if (zk != null)
+ zk.close();
+ }
+ if (zkAdminArr != null) {
+ for (ZooKeeperAdmin zkAdmin : zkAdminArr)
+ if (zkAdmin != null)
+ zkAdmin.close();
+ }
}
@Test
@@ -250,8 +257,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
- ZooKeeper[] zkArr = createHandles(qu);
- ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+ zkArr = createHandles(qu);
+ zkAdminArr = createAdminHandles(qu);
List<String> leavingServers = new ArrayList<String>();
List<String> joiningServers = new ArrayList<String>();
@@ -317,8 +324,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
leavingServers.clear();
joiningServers.clear();
}
-
- closeAllHandles(zkArr, zkAdminArr);
}
/**
@@ -332,8 +337,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
qu = new QuorumUtil(2); // create 5 servers
qu.disableJMXTest = true;
qu.startAll();
- ZooKeeper[] zkArr = createHandles(qu);
- ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+ zkArr = createHandles(qu);
+ zkAdminArr = createAdminHandles(qu);
List<String> leavingServers = new ArrayList<String>();
List<String> joiningServers = new ArrayList<String>();
@@ -423,8 +428,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
Assert.assertTrue(qu.getPeer(leavingIndex2).peer.getPeerState() == ServerState.OBSERVING);
testNormalOperation(zkArr[stayingIndex2], zkArr[leavingIndex2]);
testServerHasConfig(zkArr[leavingIndex2], joiningServers, null);
-
- closeAllHandles(zkArr, zkAdminArr);
}
@Test
@@ -432,8 +435,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
qu = new QuorumUtil(3); // create 7 servers
qu.disableJMXTest = true;
qu.startAll();
- ZooKeeper[] zkArr = createHandles(qu);
- ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+ zkArr = createHandles(qu);
+ zkAdminArr = createAdminHandles(qu);
// new config will have three of the servers as followers
// two of the servers as observers, and all ports different
@@ -462,8 +465,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
qu.shutdown(4);
testNormalOperation(zkArr[1], zkArr[2]);
-
- closeAllHandles(zkArr, zkAdminArr);
}
@Test
@@ -471,8 +472,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
qu = new QuorumUtil(2);
qu.disableJMXTest = true;
qu.startAll();
- ZooKeeper[] zkArr = createHandles(qu);
- ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+ zkArr = createHandles(qu);
+ zkAdminArr = createAdminHandles(qu);
List<String> leavingServers = new ArrayList<String>();
@@ -493,8 +494,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
testNormalOperation(zkArr[1], zkArr[2]);
for (int i=1; i<=5; i++)
testServerHasConfig(zkArr[i], null, leavingServers);
-
- closeAllHandles(zkArr, zkAdminArr);
}
@SuppressWarnings("unchecked")
@@ -512,8 +511,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
- ZooKeeper[] zkArr = createHandles(qu);
- ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+ zkArr = createHandles(qu);
+ zkAdminArr = createAdminHandles(qu);
// changing a server's role / port is done by "adding" it with the same
// id but different role / port
@@ -581,7 +580,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
changingIndex = leaderIndex;
}
}
- closeAllHandles(zkArr, zkAdminArr);
}
@Test
@@ -589,8 +587,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
- ZooKeeper[] zkArr = createHandles(qu);
- ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+ zkArr = createHandles(qu);
+ zkAdminArr = createAdminHandles(qu);
List<String> joiningServers = new ArrayList<String>();
@@ -705,8 +703,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
testNormalOperation(zkArr[follower2], zkArr[follower1]);
testServerHasConfig(zkArr[follower1], joiningServers, null);
testServerHasConfig(zkArr[follower2], joiningServers, null);
-
- closeAllHandles(zkArr, zkAdminArr);
}
@Test
@@ -722,8 +718,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
- ZooKeeper[] zkArr = createHandles(qu);
- ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+ zkArr = createHandles(qu);
+ zkAdminArr = createAdminHandles(qu);
List<String> joiningServers = new ArrayList<String>();
@@ -796,7 +792,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
testServerHasConfig(zkArr[serverIndex], joiningServers, null);
Assert.assertEquals(oldClientPort, qu.getPeer(serverIndex).peer.getClientPort());
}
- closeAllHandles(zkArr, zkAdminArr);
}
@Test
@@ -818,8 +813,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
qu = new QuorumUtil(3); // create 7 servers
qu.disableJMXTest = true;
qu.startAll();
- ZooKeeper[] zkArr = createHandles(qu);
- ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+ zkArr = createHandles(qu);
+ zkAdminArr = createAdminHandles(qu);
ArrayList<String> members = new ArrayList<String>();
members.add("group.1=3:4:5");
@@ -886,8 +881,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
+ i
+ " doesn't think the quorum system is a majority quorum system!");
}
-
- closeAllHandles(zkArr, zkAdminArr);
}
@Test
@@ -895,7 +888,7 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
- ZooKeeper[] zkArr = createHandles(qu);
+ zkArr = createHandles(qu);
testNormalOperation(zkArr[1], zkArr[2]);
for (int i=1; i<4; i++) {
String configStr = testServerHasConfig(zkArr[i], null, null);
@@ -914,8 +907,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
- ZooKeeper[] zkArr = createHandles(qu);
- ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+ zkArr = createHandles(qu);
+ zkAdminArr = createAdminHandles(qu);
List<String> leavingServers = new ArrayList<String>();
List<String> joiningServers = new ArrayList<String>();
@@ -980,8 +973,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
// assert remotePeerBean.1 of ReplicatedServer_3
leavingQS3 = peer3.getView().get(new Long(leavingIndex));
assertRemotePeerMXBeanAttributes(leavingQS3, remotePeerBean3);
-
- closeAllHandles(zkArr, zkAdminArr);
}
/**
@@ -993,8 +984,8 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
- ZooKeeper[] zkArr = createHandles(qu);
- ZooKeeperAdmin[] zkAdminArr = createAdminHandles(qu);
+ zkArr = createHandles(qu);
+ zkAdminArr = createAdminHandles(qu);
// changing a server's role / port is done by "adding" it with the same
// id but different role / port
@@ -1055,8 +1046,6 @@ public class ReconfigTest extends ZKTestCase implements DataCallback{
// assert remotePeerBean.1 of ReplicatedServer_3
changingQS3 = peer3.getView().get(new Long(changingIndex));
assertRemotePeerMXBeanAttributes(changingQS3, remotePeerBean3);
-
- closeAllHandles(zkArr, zkAdminArr);
}
private void assertLocalPeerMXBeanAttributes(QuorumPeer qp,
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java
new file mode 100644
index 0000000..dc13222
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocator.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.ResourceLeakDetector;
+
+/**
+ * This is a custom ByteBufAllocator that tracks outstanding allocations and
+ * crashes the program if any of them are leaked.
+ *
+ * Never use this class in production, it will cause your server to run out
+ * of memory! This is because it holds strong references to all allocated
+ * buffers and doesn't release them until checkForLeaks() is called at the
+ * end of a unit test.
+ *
+ * Note: the original code was copied from https://github.com/airlift/drift,
+ * with the permission and encouragement of airlift's author (dain). Airlift
+ * uses the same apache 2.0 license as Zookeeper so this should be ok.
+ *
+ * However, the code was modified to take advantage of Netty's built-in
+ * leak tracking and make a best effort to print details about buffer leaks.
+ *
+ */
+public class TestByteBufAllocator extends PooledByteBufAllocator {
+ private static AtomicReference<TestByteBufAllocator> INSTANCE =
+ new AtomicReference<>(null);
+
+ /**
+ * Get the singleton testing allocator.
+ * @return the singleton allocator, creating it if one does not exist.
+ */
+ public static TestByteBufAllocator getInstance() {
+ TestByteBufAllocator result = INSTANCE.get();
+ if (result == null) {
+ ResourceLeakDetector.Level oldLevel = ResourceLeakDetector.getLevel();
+ ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+ INSTANCE.compareAndSet(null, new TestByteBufAllocator(oldLevel));
+ result = INSTANCE.get();
+ }
+ return result;
+ }
+
+ /**
+ * Destroys the singleton testing allocator and throws an error if any of the
+ * buffers allocated by it have been leaked. Attempts to print leak details to
+ * standard error before throwing, by using netty's built-in leak tracking.
+ * Note that this might not always work, since it only triggers when a buffer
+ * is garbage-collected and calling System.gc() does not guarantee that a buffer
+ * will actually be GC'ed.
+ *
+ * This should be called at the end of a unit test's tearDown() method.
+ */
+ public static void checkForLeaks() {
+ TestByteBufAllocator result = INSTANCE.getAndSet(null);
+ if (result != null) {
+ result.checkInstanceForLeaks();
+ }
+ }
+
+ private final List<ByteBuf> trackedBuffers = new ArrayList<>();
+ private final ResourceLeakDetector.Level oldLevel;
+
+ private TestByteBufAllocator(ResourceLeakDetector.Level oldLevel)
+ {
+ super(false);
+ this.oldLevel = oldLevel;
+ }
+
+ @Override
+ protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity)
+ {
+ return track(super.newHeapBuffer(initialCapacity, maxCapacity));
+ }
+
+ @Override
+ protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity)
+ {
+ return track(super.newDirectBuffer(initialCapacity, maxCapacity));
+ }
+
+ @Override
+ public CompositeByteBuf compositeHeapBuffer(int maxNumComponents)
+ {
+ return track(super.compositeHeapBuffer(maxNumComponents));
+ }
+
+ @Override
+ public CompositeByteBuf compositeDirectBuffer(int maxNumComponents)
+ {
+ return track(super.compositeDirectBuffer(maxNumComponents));
+ }
+
+ private synchronized CompositeByteBuf track(CompositeByteBuf byteBuf)
+ {
+ trackedBuffers.add(Objects.requireNonNull(byteBuf));
+ return byteBuf;
+ }
+
+ private synchronized ByteBuf track(ByteBuf byteBuf)
+ {
+ trackedBuffers.add(Objects.requireNonNull(byteBuf));
+ return byteBuf;
+ }
+
+ private void checkInstanceForLeaks()
+ {
+ try {
+ long referencedBuffersCount = 0;
+ synchronized (this) {
+ referencedBuffersCount = trackedBuffers.stream()
+ .filter(byteBuf -> byteBuf.refCnt() > 0)
+ .count();
+ // Make tracked buffers eligible for GC
+ trackedBuffers.clear();
+ }
+ // Throw an error if there were any leaked buffers
+ if (referencedBuffersCount > 0) {
+ // Trigger a GC. This will hopefully (but not necessarily) print
+ // details about detected leaks to standard error before the error
+ // is thrown.
+ System.gc();
+ throw new AssertionError("Found a netty ByteBuf leak!");
+ }
+ } finally {
+ ResourceLeakDetector.setLevel(oldLevel);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocatorTestHelper.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocatorTestHelper.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocatorTestHelper.java
new file mode 100644
index 0000000..de5e751
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestByteBufAllocatorTestHelper.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import io.netty.buffer.ByteBufAllocator;
+import org.apache.zookeeper.ClientCnxnSocketNetty;
+import org.apache.zookeeper.server.NettyServerCnxnFactory;
+
+/**
+ * Uses reflection to call package-private methods in Netty connection classes
+ * to set/clear the test ByteBufAllocator.
+ */
+public class TestByteBufAllocatorTestHelper {
+ public static void setTestAllocator(ByteBufAllocator allocator)
+ throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+ Method m1 = NettyServerCnxnFactory.class.getDeclaredMethod("setTestAllocator", ByteBufAllocator.class);
+ m1.setAccessible(true);
+ m1.invoke(null, allocator);
+ Method m2 = ClientCnxnSocketNetty.class.getDeclaredMethod("setTestAllocator", ByteBufAllocator.class);
+ m2.setAccessible(true);
+ m2.invoke(null, allocator);
+ }
+
+ public static void clearTestAllocator()
+ throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+ Method m1 = NettyServerCnxnFactory.class.getDeclaredMethod("clearTestAllocator");
+ m1.setAccessible(true);
+ m1.invoke(null);
+ Method m2 = ClientCnxnSocketNetty.class.getDeclaredMethod("clearTestAllocator");
+ m2.setAccessible(true);
+ m2.invoke(null);
+ }
+}
\ No newline at end of file
[2/2] zookeeper git commit: ZOOKEEPER-3152: Port ZK netty stack to
netty4
Posted by an...@apache.org.
ZOOKEEPER-3152: Port ZK netty stack to netty4
Summary: Ported the client connection netty stack from netty3 to netty4. This includes both the server side (NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty).
Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus manual testing on a regional ensemble.
FB Reviewers: nixon
Author: Ilya Maykov <il...@fb.com>
Reviewers: andor@apache.org
Closes #669 from ivmaykov/ZOOKEEPER-3152
Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/caca0627
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/caca0627
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/caca0627
Branch: refs/heads/master
Commit: caca062767c36525e6ecead2ae0f34c447394809
Parents: 1507f67
Author: Ilya Maykov <il...@fb.com>
Authored: Thu Nov 22 17:56:01 2018 +0100
Committer: Andor Molnar <an...@apache.org>
Committed: Thu Nov 22 17:56:01 2018 +0100
----------------------------------------------------------------------
build.xml | 2 +-
ivy.xml | 4 +-
.../org/apache/zookeeper/ClientCnxnSocket.java | 9 +-
.../apache/zookeeper/ClientCnxnSocketNIO.java | 4 +-
.../apache/zookeeper/ClientCnxnSocketNetty.java | 312 +++++++-----
.../org/apache/zookeeper/common/NettyUtils.java | 76 +++
.../zookeeper/server/NettyServerCnxn.java | 364 +++++++++-----
.../server/NettyServerCnxnFactory.java | 474 ++++++++++---------
.../server/quorum/UnifiedServerSocket.java | 6 +-
.../apache/zookeeper/ClientCnxnSocketTest.java | 13 +
.../zookeeper/server/NettyServerCnxnTest.java | 71 +++
.../org/apache/zookeeper/test/ClientTest.java | 1 +
.../zookeeper/test/NettyNettySuiteBase.java | 13 +
.../zookeeper/test/NioNettySuiteBase.java | 13 +
.../org/apache/zookeeper/test/ReconfigTest.java | 79 ++--
.../zookeeper/test/TestByteBufAllocator.java | 152 ++++++
.../test/TestByteBufAllocatorTestHelper.java | 52 ++
17 files changed, 1126 insertions(+), 519 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 3411025..5868532 100644
--- a/build.xml
+++ b/build.xml
@@ -36,7 +36,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
<property name="audience-annotations.version" value="0.5.0" />
- <property name="netty.version" value="3.10.6.Final"/>
+ <property name="netty.version" value="4.1.29.Final"/>
<property name="junit.version" value="4.12"/>
<property name="mockito.version" value="1.8.5"/>
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy.xml b/ivy.xml
index 8692640..c7f79b6 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -59,8 +59,8 @@
<dependency org="org.apache.yetus" name="audience-annotations"
rev="${audience-annotations.version}"/>
- <dependency org="io.netty" name="netty" conf="default" rev="${netty.version}">
- <artifact name="netty" type="jar" conf="default"/>
+ <dependency org="io.netty" name="netty-all" conf="default" rev="${netty.version}">
+ <artifact name="netty-all" type="jar" conf="default"/>
</dependency>
<dependency org="com.googlecode.json-simple" name="json-simple" rev="${json.version}" >
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java
index 51ae8bf..ba3806c 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.jute.BinaryInputArchive;
import org.apache.zookeeper.ClientCnxn.Packet;
@@ -59,8 +60,8 @@ abstract class ClientCnxnSocket {
* readLength() to receive the full message.
*/
protected ByteBuffer incomingBuffer = lenBuffer;
- protected long sentCount = 0;
- protected long recvCount = 0;
+ protected final AtomicLong sentCount = new AtomicLong(0L);
+ protected final AtomicLong recvCount = new AtomicLong(0L);
protected long lastHeard;
protected long lastSend;
protected long now;
@@ -95,11 +96,11 @@ abstract class ClientCnxnSocket {
}
long getSentCount() {
- return sentCount;
+ return sentCount.get();
}
long getRecvCount() {
- return recvCount;
+ return recvCount.get();
}
void updateLastHeard() {
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
index f17a819..4c97721 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
@@ -82,7 +82,7 @@ public class ClientCnxnSocketNIO extends ClientCnxnSocket {
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
- recvCount++;
+ recvCount.getAndIncrement();
readLength();
} else if (!initialized) {
readConnectResult();
@@ -122,7 +122,7 @@ public class ClientCnxnSocketNIO extends ClientCnxnSocket {
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
- sentCount++;
+ sentCount.getAndIncrement();
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java
index 34c3db3..74d1283 100755
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java
@@ -18,46 +18,45 @@
package org.apache.zookeeper;
-import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
-import org.apache.zookeeper.ClientCnxn.Packet;
-import org.apache.zookeeper.client.ZKClientConfig;
-import org.apache.zookeeper.common.ClientX509Util;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.apache.zookeeper.common.NettyUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import static org.apache.zookeeper.common.X509Exception.SSLContextException;
/**
@@ -68,18 +67,21 @@ import static org.apache.zookeeper.common.X509Exception.SSLContextException;
public class ClientCnxnSocketNetty extends ClientCnxnSocket {
private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNetty.class);
- ChannelFactory channelFactory = new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
- Channel channel;
- CountDownLatch firstConnect;
- ChannelFuture connectFuture;
- Lock connectLock = new ReentrantLock();
- AtomicBoolean disconnected = new AtomicBoolean();
- AtomicBoolean needSasl = new AtomicBoolean();
- Semaphore waitSasl = new Semaphore(0);
+ private final EventLoopGroup eventLoopGroup;
+ private Channel channel;
+ private CountDownLatch firstConnect;
+ private ChannelFuture connectFuture;
+ private final Lock connectLock = new ReentrantLock();
+ private final AtomicBoolean disconnected = new AtomicBoolean();
+ private final AtomicBoolean needSasl = new AtomicBoolean();
+ private final Semaphore waitSasl = new Semaphore(0);
+
+ private static final AtomicReference<ByteBufAllocator> TEST_ALLOCATOR =
+ new AtomicReference<>(null);
ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
this.clientConfig = clientConfig;
+ eventLoopGroup = NettyUtils.newNioOrEpollEventLoopGroup();
initProperties();
}
@@ -103,59 +105,90 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
boolean isConnected() {
// Assuming that isConnected() is only used to initiate connection,
// not used by some other connection status judgement.
- return channel != null;
+ connectLock.lock();
+ try {
+ return channel != null || connectFuture != null;
+ } finally {
+ connectLock.unlock();
+ }
+ }
+
+ private Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) {
+ ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
+ if (testAllocator != null) {
+ return bootstrap.option(ChannelOption.ALLOCATOR, testAllocator);
+ } else {
+ return bootstrap;
+ }
}
@Override
void connect(InetSocketAddress addr) throws IOException {
firstConnect = new CountDownLatch(1);
- ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
-
- bootstrap.setPipelineFactory(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
- bootstrap.setOption("soLinger", -1);
- bootstrap.setOption("tcpNoDelay", true);
-
- connectFuture = bootstrap.connect(addr);
- connectFuture.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture channelFuture) throws Exception {
- // this lock guarantees that channel won't be assgined after cleanup().
- connectLock.lock();
- try {
- if (!channelFuture.isSuccess() || connectFuture == null) {
- LOG.info("future isn't success, cause: {}", channelFuture.getCause());
- return;
- }
- // setup channel, variables, connection, etc.
- channel = channelFuture.getChannel();
-
- disconnected.set(false);
- initialized = false;
- lenBuffer.clear();
- incomingBuffer = lenBuffer;
-
- sendThread.primeConnection();
- updateNow();
- updateLastSendAndHeard();
-
- if (sendThread.tunnelAuthInProgress()) {
- waitSasl.drainPermits();
- needSasl.set(true);
- sendPrimePacket();
- } else {
- needSasl.set(false);
- }
+ Bootstrap bootstrap = new Bootstrap()
+ .group(eventLoopGroup)
+ .channel(NettyUtils.nioOrEpollSocketChannel())
+ .option(ChannelOption.SO_LINGER, -1)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
+ bootstrap = configureBootstrapAllocator(bootstrap);
+ bootstrap.validate();
- // we need to wake up on first connect to avoid timeout.
- wakeupCnxn();
- firstConnect.countDown();
- LOG.info("channel is connected: {}", channelFuture.getChannel());
- } finally {
- connectLock.unlock();
+ connectLock.lock();
+ try {
+ connectFuture = bootstrap.connect(addr);
+ connectFuture.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture channelFuture) throws Exception {
+ // this lock guarantees that channel won't be assigned after cleanup().
+ connectLock.lock();
+ try {
+ if (!channelFuture.isSuccess()) {
+ LOG.info("future isn't success, cause:", channelFuture.cause());
+ return;
+ } else if (connectFuture == null) {
+ LOG.info("connect attempt cancelled");
+ // If the connect attempt was cancelled but succeeded
+ // anyway, make sure to close the channel, otherwise
+ // we may leak a file descriptor.
+ channelFuture.channel().close();
+ return;
+ }
+ // setup channel, variables, connection, etc.
+ channel = channelFuture.channel();
+
+ disconnected.set(false);
+ initialized = false;
+ lenBuffer.clear();
+ incomingBuffer = lenBuffer;
+
+ sendThread.primeConnection();
+ updateNow();
+ updateLastSendAndHeard();
+
+ if (sendThread.tunnelAuthInProgress()) {
+ waitSasl.drainPermits();
+ needSasl.set(true);
+ sendPrimePacket();
+ } else {
+ needSasl.set(false);
+ }
+ LOG.info("channel is connected: {}", channelFuture.channel());
+ } finally {
+ connectFuture = null;
+ connectLock.unlock();
+ // need to wake on connect success or failure to avoid
+ // timing out ClientCnxn.SendThread which may be
+ // blocked waiting for first connect in doTransport().
+ wakeupCnxn();
+ firstConnect.countDown();
+ }
}
- }
- });
+ });
+ } finally {
+ connectLock.unlock();
+ }
}
@Override
@@ -163,11 +196,11 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
connectLock.lock();
try {
if (connectFuture != null) {
- connectFuture.cancel();
+ connectFuture.cancel(false);
connectFuture = null;
}
if (channel != null) {
- channel.close().awaitUninterruptibly();
+ channel.close().syncUninterruptibly();
channel = null;
}
} finally {
@@ -184,7 +217,9 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
@Override
void close() {
- channelFactory.releaseExternalResources();
+ if (!eventLoopGroup.isShuttingDown()) {
+ eventLoopGroup.shutdownGracefully();
+ }
}
@Override
@@ -199,6 +234,9 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
@Override
void packetAdded() {
+ // NO-OP. Adding a packet will already wake up a netty connection
+ // so we don't need to add a dummy packet to the queue to trigger
+ // a wake-up.
}
@Override
@@ -230,13 +268,11 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
return;
}
} else {
- if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
- return;
- }
+ head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS);
}
// check if being waken up on closing.
if (!sendThread.getZkState().isAlive()) {
- // adding back the patck to notify of failure in conLossPacket().
+ // adding back the packet to notify of failure in conLossPacket().
addBack(head);
return;
}
@@ -261,18 +297,46 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
}
}
- private void sendPkt(Packet p) {
+ /**
+ * Sends a packet to the remote peer and flushes the channel.
+ * @param p packet to send.
+ * @return a ChannelFuture that will complete when the write operation
+ * succeeds or fails.
+ */
+ private ChannelFuture sendPktAndFlush(Packet p) {
+ return sendPkt(p, true);
+ }
+
+ /**
+ * Sends a packet to the remote peer but does not flush() the channel.
+ * @param p packet to send.
+ * @return a ChannelFuture that will complete when the write operation
+ * succeeds or fails.
+ */
+ private ChannelFuture sendPktOnly(Packet p) {
+ return sendPkt(p, false);
+ }
+
+ private ChannelFuture sendPkt(Packet p, boolean doFlush) {
// Assuming the packet will be sent out successfully. Because if it fails,
// the channel will close and clean up queues.
p.createBB();
updateLastSend();
- sentCount++;
- channel.write(ChannelBuffers.wrappedBuffer(p.bb));
+ ChannelFuture result = channel.write(Unpooled.wrappedBuffer(p.bb));
+ result.addListener(f -> {
+ if (f.isSuccess()) {
+ sentCount.getAndIncrement();
+ }
+ });
+ if (doFlush) {
+ channel.flush();
+ }
+ return result;
}
private void sendPrimePacket() {
// assuming the first packet is the priming packet.
- sendPkt(outgoingQueue.remove());
+ sendPktAndFlush(outgoingQueue.remove());
}
/**
@@ -290,13 +354,16 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
pendingQueue.add(p);
}
}
- sendPkt(p);
+ sendPktOnly(p);
}
if (outgoingQueue.isEmpty()) {
break;
}
p = outgoingQueue.remove();
}
+ // TODO: maybe we should flush in the loop above every N packets/bytes?
+ // But, how do we determine the right value for N ...
+ channel.flush();
}
@Override
@@ -304,19 +371,19 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
if (channel == null) {
throw new IOException("channel has been closed");
}
- sendPkt(p);
+ sendPktAndFlush(p);
}
@Override
SocketAddress getRemoteSocketAddress() {
Channel copiedChanRef = channel;
- return (copiedChanRef == null) ? null : copiedChanRef.getRemoteAddress();
+ return (copiedChanRef == null) ? null : copiedChanRef.remoteAddress();
}
@Override
SocketAddress getLocalSocketAddress() {
Channel copiedChanRef = channel;
- return (copiedChanRef == null) ? null : copiedChanRef.getLocalAddress();
+ return (copiedChanRef == null) ? null : copiedChanRef.localAddress();
}
@Override
@@ -345,7 +412,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
* ZKClientPipelineFactory is the netty pipeline factory for this netty
* connection implementation.
*/
- private class ZKClientPipelineFactory implements ChannelPipelineFactory {
+ private class ZKClientPipelineFactory extends ChannelInitializer<SocketChannel> {
private SSLContext sslContext = null;
private SSLEngine sslEngine = null;
private String host;
@@ -357,13 +424,12 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
}
@Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
if (clientConfig.getBoolean(ZKClientConfig.SECURE_CLIENT)) {
initSSL(pipeline);
}
pipeline.addLast("handler", new ZKClientHandler());
- return pipeline;
}
// The synchronized is to prevent the race on shared variable "sslEngine".
@@ -375,7 +441,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
sslEngine.setUseClientMode(true);
}
pipeline.addLast("ssl", new SslHandler(sslEngine));
- LOG.info("SSL handler added for channel: {}", pipeline.getChannel());
+ LOG.info("SSL handler added for channel: {}", pipeline.channel());
}
}
@@ -383,13 +449,12 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
* ZKClientHandler is the netty handler that sits in netty upstream last
* place. It mainly handles read traffic and helps synchronize connection state.
*/
- private class ZKClientHandler extends SimpleChannelUpstreamHandler {
+ private class ZKClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
AtomicBoolean channelClosed = new AtomicBoolean(false);
@Override
- public void channelDisconnected(ChannelHandlerContext ctx,
- ChannelStateEvent e) throws Exception {
- LOG.info("channel is disconnected: {}", ctx.getChannel());
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ LOG.info("channel is disconnected: {}", ctx.channel());
cleanup();
}
@@ -406,11 +471,9 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
}
@Override
- public void messageReceived(ChannelHandlerContext ctx,
- MessageEvent e) throws Exception {
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
updateNow();
- ChannelBuffer buf = (ChannelBuffer) e.getMessage();
- while (buf.readable()) {
+ while (buf.isReadable()) {
if (incomingBuffer.remaining() > buf.readableBytes()) {
int newLimit = incomingBuffer.position()
+ buf.readableBytes();
@@ -422,7 +485,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
- recvCount++;
+ recvCount.getAndIncrement();
readLength();
} else if (!initialized) {
readConnectResult();
@@ -439,13 +502,34 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
}
}
wakeupCnxn();
+ // Note: SimpleChannelInboundHandler releases the ByteBuf for us
+ // so we don't need to do it.
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx,
- ExceptionEvent e) throws Exception {
- LOG.warn("Exception caught: {}", e, e.getCause());
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ LOG.warn("Exception caught", cause);
cleanup();
}
}
+
+ /**
+ * Sets the test ByteBufAllocator. This allocator will be used by all
+ * future instances of this class.
+ * It is not recommended to use this method outside of testing.
+ * @param allocator the ByteBufAllocator to use for all netty buffer
+ * allocations.
+ */
+ static void setTestAllocator(ByteBufAllocator allocator) {
+ TEST_ALLOCATOR.set(allocator);
+ }
+
+ /**
+ * Clears the test ByteBufAllocator. The default allocator will be used
+ * by all future instances of this class.
+ * It is not recommended to use this method outside of testing.
+ */
+ static void clearTestAllocator() {
+ TEST_ALLOCATOR.set(null);
+ }
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java
new file mode 100644
index 0000000..5883296
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.common;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+/**
+ * Helper methods for netty code.
+ */
+public class NettyUtils {
+ /**
+ * If {@link Epoll#isAvailable()} <code>== true</code>, returns a new
+ * {@link EpollEventLoopGroup}, otherwise returns a new
+ * {@link NioEventLoopGroup}.
+ * @return a new {@link EventLoopGroup}.
+ */
+ public static EventLoopGroup newNioOrEpollEventLoopGroup() {
+ if (Epoll.isAvailable()) {
+ return new EpollEventLoopGroup();
+ } else {
+ return new NioEventLoopGroup();
+ }
+ }
+
+ /**
+ * If {@link Epoll#isAvailable()} <code>== true</code>, returns
+ * {@link EpollSocketChannel}, otherwise returns {@link NioSocketChannel}.
+ * @return a socket channel class.
+ */
+ public static Class<? extends SocketChannel> nioOrEpollSocketChannel() {
+ if (Epoll.isAvailable()) {
+ return EpollSocketChannel.class;
+ } else {
+ return NioSocketChannel.class;
+ }
+ }
+
+ /**
+ * If {@link Epoll#isAvailable()} <code>== true</code>, returns
+ * {@link EpollServerSocketChannel}, otherwise returns
+ * {@link NioServerSocketChannel}.
+ * @return a server socket channel class.
+ */
+ public static Class<? extends ServerSocketChannel> nioOrEpollServerSocketChannel() {
+ if (Epoll.isAvailable()) {
+ return EpollServerSocketChannel.class;
+ } else {
+ return NioServerSocketChannel.class;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index f0a8f7f..311d3c1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -18,23 +18,26 @@
package org.apache.zookeeper.server;
-import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
-
import java.io.BufferedWriter;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
import java.security.cert.Certificate;
import java.util.Arrays;
-import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.util.ReferenceCountUtil;
import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.proto.ReplyHeader;
@@ -43,29 +46,23 @@ import org.apache.zookeeper.server.command.CommandExecutor;
import org.apache.zookeeper.server.command.FourLetterCommands;
import org.apache.zookeeper.server.command.NopCommand;
import org.apache.zookeeper.server.command.SetTraceMaskCommand;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.MessageEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NettyServerCnxn extends ServerCnxn {
private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxn.class);
- Channel channel;
- ChannelBuffer queuedBuffer;
- volatile boolean throttled;
- ByteBuffer bb;
- ByteBuffer bbLen = ByteBuffer.allocate(4);
- long sessionId;
- int sessionTimeout;
- Certificate[] clientChain;
- volatile boolean closingChannel;
-
- NettyServerCnxnFactory factory;
- boolean initialized;
+ private final Channel channel;
+ private ByteBuf queuedBuffer;
+ private final AtomicBoolean throttled = new AtomicBoolean(false);
+ private ByteBuffer bb;
+ private final ByteBuffer bbLen = ByteBuffer.allocate(4);
+ private long sessionId;
+ private int sessionTimeout;
+ private Certificate[] clientChain;
+ private volatile boolean closingChannel;
+
+ private final NettyServerCnxnFactory factory;
+ private boolean initialized;
NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
super(zks);
@@ -82,8 +79,8 @@ public class NettyServerCnxn extends ServerCnxn {
closingChannel = true;
if (LOG.isDebugEnabled()) {
- LOG.debug("close called for sessionid:0x"
- + Long.toHexString(sessionId));
+ LOG.debug("close called for sessionid:0x{}",
+ Long.toHexString(sessionId));
}
setStale();
@@ -92,28 +89,23 @@ public class NettyServerCnxn extends ServerCnxn {
// connection bean leak under certain race conditions.
factory.unregisterConnection(this);
- synchronized(factory.cnxns){
- // if this is not in cnxns then it's already closed
- if (!factory.cnxns.remove(this)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("cnxns size:" + factory.cnxns.size());
- }
- return;
- }
+ // if this is not in cnxns then it's already closed
+ if (!factory.cnxns.remove(this)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("close in progress for sessionid:0x"
- + Long.toHexString(sessionId));
+ LOG.debug("cnxns size:{}", factory.cnxns.size());
}
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("close in progress for sessionid:0x{}",
+ Long.toHexString(sessionId));
+ }
- factory.removeCnxnFromSessionMap(this);
+ factory.removeCnxnFromSessionMap(this);
- synchronized (factory.ipMap) {
- Set<NettyServerCnxn> s =
- factory.ipMap.get(((InetSocketAddress)channel
- .getRemoteAddress()).getAddress());
- s.remove(this);
- }
- }
+ factory.removeCnxnFromIpMap(
+ this,
+ ((InetSocketAddress)channel.remoteAddress()).getAddress());
if (zkServer != null) {
zkServer.removeCnxn(this);
@@ -123,7 +115,14 @@ public class NettyServerCnxn extends ServerCnxn {
// Since we don't check on the futures created by write calls to the channel complete we need to make sure
// that all writes have been completed before closing the channel or we risk data loss
// See: http://lists.jboss.org/pipermail/netty-users/2009-August/001122.html
- channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+ channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ future.channel().close().addListener(f -> releaseQueuedBuffer());
+ }
+ });
+ } else {
+ channel.eventLoop().execute(this::releaseQueuedBuffer);
}
}
@@ -160,21 +159,6 @@ public class NettyServerCnxn extends ServerCnxn {
}
}
- static class ResumeMessageEvent implements MessageEvent {
- Channel channel;
- ResumeMessageEvent(Channel channel) {
- this.channel = channel;
- }
- @Override
- public Object getMessage() {return null;}
- @Override
- public SocketAddress getRemoteAddress() {return null;}
- @Override
- public Channel getChannel() {return channel;}
- @Override
- public ChannelFuture getFuture() {return null;}
- };
-
@Override
public void sendResponse(ReplyHeader h, Record r, String tag)
throws IOException {
@@ -192,28 +176,18 @@ public class NettyServerCnxn extends ServerCnxn {
}
@Override
- public void enableRecv() {
- if (throttled) {
- throttled = false;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sending unthrottle event " + this);
- }
- channel.getPipeline().sendUpstream(new ResumeMessageEvent(channel));
- }
- }
-
- @Override
public void sendBuffer(ByteBuffer sendBuffer) {
if (sendBuffer == ServerCnxnFactory.closeConn) {
close();
return;
}
- channel.write(wrappedBuffer(sendBuffer));
- packetSent();
+ channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer)).addListener(f -> {
+ if (f.isSuccess()) {
+ packetSent();
+ }
+ });
}
-
-
/**
* This class wraps the sendBuffer method of NIOServerCnxn. It is
* responsible for chunking up the response to a client. Rather
@@ -255,9 +229,7 @@ public class NettyServerCnxn extends ServerCnxn {
}
/** Return if four letter word found and responded to, otw false **/
- private boolean checkFourLetterWord(final Channel channel,
- ChannelBuffer message, final int len) throws IOException
- {
+ private boolean checkFourLetterWord(final Channel channel, ByteBuf message, final int len) {
// We take advantage of the limited size of the length to look
// for cmds. They are all 4-bytes which fits inside of an int
if (!FourLetterCommands.isKnown(len)) {
@@ -266,7 +238,10 @@ public class NettyServerCnxn extends ServerCnxn {
String cmd = FourLetterCommands.getCommandString(len);
- channel.setInterestOps(0).awaitUninterruptibly();
+ // Stops automatic reads of incoming data on this channel. We don't
+ // expect any more traffic from the client when processing a 4LW
+ // so this shouldn't break anything.
+ channel.config().setAutoRead(false);
packetReceived(4);
final PrintWriter pwriter = new PrintWriter(
@@ -281,8 +256,7 @@ public class NettyServerCnxn extends ServerCnxn {
return true;
}
- LOG.info("Processing " + cmd + " command from "
- + channel.getRemoteAddress());
+ LOG.info("Processing {} command from {}", cmd, channel.remoteAddress());
if (len == FourLetterCommands.setTraceMaskCmd) {
ByteBuffer mask = ByteBuffer.allocate(8);
@@ -299,19 +273,126 @@ public class NettyServerCnxn extends ServerCnxn {
}
}
- public void receiveMessage(ChannelBuffer message) {
+ /**
+ * Process incoming message. This should only be called from the event
+ * loop thread.
+ * @param buf the message bytes to process.
+ */
+ void processMessage(ByteBuf buf) {
+ assert channel.eventLoop().inEventLoop();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("0x{} queuedBuffer: {}",
+ Long.toHexString(sessionId),
+ queuedBuffer);
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("0x{} buf {}",
+ Long.toHexString(sessionId),
+ ByteBufUtil.hexDump(buf));
+ }
+
+ if (throttled.get()) {
+ LOG.debug("Received message while throttled");
+ // we are throttled, so we need to queue
+ if (queuedBuffer == null) {
+ LOG.debug("allocating queue");
+ queuedBuffer = channel.alloc().buffer(buf.readableBytes());
+ }
+ queuedBuffer.writeBytes(buf);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("0x{} queuedBuffer {}",
+ Long.toHexString(sessionId),
+ ByteBufUtil.hexDump(queuedBuffer));
+ }
+ } else {
+ LOG.debug("not throttled");
+ if (queuedBuffer != null) {
+ queuedBuffer.writeBytes(buf);
+ processQueuedBuffer();
+ } else {
+ receiveMessage(buf);
+ // Have to check !closingChannel, because an error in
+ // receiveMessage() could have led to close() being called.
+ if (!closingChannel && buf.isReadable()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Before copy {}", buf);
+ }
+ if (queuedBuffer == null) {
+ queuedBuffer = channel.alloc().buffer(buf.readableBytes());
+ }
+ queuedBuffer.writeBytes(buf);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Copy is {}", queuedBuffer);
+ LOG.trace("0x{} queuedBuffer {}",
+ Long.toHexString(sessionId),
+ ByteBufUtil.hexDump(queuedBuffer));
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Try to process previously queued message. This should only be called
+ * from the event loop thread.
+ */
+ void processQueuedBuffer() {
+ assert channel.eventLoop().inEventLoop();
+ if (queuedBuffer != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("processing queue 0x{} queuedBuffer {}",
+ Long.toHexString(sessionId),
+ ByteBufUtil.hexDump(queuedBuffer));
+ }
+ receiveMessage(queuedBuffer);
+ if (closingChannel) {
+ // close() could have been called if receiveMessage() failed
+ LOG.debug("Processed queue - channel closed, dropping remaining bytes");
+ } else if (!queuedBuffer.isReadable()) {
+ LOG.debug("Processed queue - no bytes remaining");
+ releaseQueuedBuffer();
+ } else {
+ LOG.debug("Processed queue - bytes remaining");
+ }
+ } else {
+ LOG.debug("queue empty");
+ }
+ }
+
+ /**
+ * Clean up queued buffer once it's no longer needed. This should only be
+ * called from the event loop thread.
+ */
+ private void releaseQueuedBuffer() {
+ assert channel.eventLoop().inEventLoop();
+ if (queuedBuffer != null) {
+ ReferenceCountUtil.release(queuedBuffer);
+ queuedBuffer = null;
+ }
+ }
+
+ /**
+ * Receive a message, which can come from the queued buffer or from a new
+ * buffer coming in over the channel. This should only be called from the
+ * event loop thread.
+ * @param message the message bytes to process.
+ */
+ private void receiveMessage(ByteBuf message) {
+ assert channel.eventLoop().inEventLoop();
try {
- while(message.readable() && !throttled) {
+ while(message.isReadable() && !throttled.get()) {
if (bb != null) {
if (LOG.isTraceEnabled()) {
- LOG.trace("message readable " + message.readableBytes()
- + " bb len " + bb.remaining() + " " + bb);
+ LOG.trace("message readable {} bb len {} {}",
+ message.readableBytes(),
+ bb.remaining(),
+ bb);
ByteBuffer dat = bb.duplicate();
dat.flip();
- LOG.trace(Long.toHexString(sessionId)
- + " bb 0x"
- + ChannelBuffers.hexDump(
- ChannelBuffers.copiedBuffer(dat)));
+ LOG.trace("0x{} bb {}",
+ Long.toHexString(sessionId),
+ ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
if (bb.remaining() > message.readableBytes()) {
@@ -322,16 +403,15 @@ public class NettyServerCnxn extends ServerCnxn {
bb.limit(bb.capacity());
if (LOG.isTraceEnabled()) {
- LOG.trace("after readBytes message readable "
- + message.readableBytes()
- + " bb len " + bb.remaining() + " " + bb);
+ LOG.trace("after readBytes message readable {} bb len {} {}",
+ message.readableBytes(),
+ bb.remaining(),
+ bb);
ByteBuffer dat = bb.duplicate();
dat.flip();
- LOG.trace("after readbytes "
- + Long.toHexString(sessionId)
- + " bb 0x"
- + ChannelBuffers.hexDump(
- ChannelBuffers.copiedBuffer(dat)));
+ LOG.trace("after readbytes 0x{} bb {}",
+ Long.toHexString(sessionId),
+ ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
if (bb.remaining() == 0) {
bb.flip();
@@ -342,10 +422,14 @@ public class NettyServerCnxn extends ServerCnxn {
throw new IOException("ZK down");
}
if (initialized) {
+ // TODO: if zks.processPacket() is changed to take a ByteBuffer[],
+ // we could implement zero-copy queueing.
zks.processPacket(this, bb);
} else {
- LOG.debug("got conn req request from "
- + getRemoteSocketAddress());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("got conn req request from {}",
+ getRemoteSocketAddress());
+ }
zks.processConnectRequest(this, bb);
initialized = true;
}
@@ -353,15 +437,14 @@ public class NettyServerCnxn extends ServerCnxn {
}
} else {
if (LOG.isTraceEnabled()) {
- LOG.trace("message readable "
- + message.readableBytes()
- + " bblenrem " + bbLen.remaining());
+ LOG.trace("message readable {} bblenrem {}",
+ message.readableBytes(),
+ bbLen.remaining());
ByteBuffer dat = bbLen.duplicate();
dat.flip();
- LOG.trace(Long.toHexString(sessionId)
- + " bbLen 0x"
- + ChannelBuffers.hexDump(
- ChannelBuffers.copiedBuffer(dat)));
+ LOG.trace("0x{} bbLen {}",
+ Long.toHexString(sessionId),
+ ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
}
if (message.readableBytes() < bbLen.remaining()) {
@@ -373,15 +456,15 @@ public class NettyServerCnxn extends ServerCnxn {
bbLen.flip();
if (LOG.isTraceEnabled()) {
- LOG.trace(Long.toHexString(sessionId)
- + " bbLen 0x"
- + ChannelBuffers.hexDump(
- ChannelBuffers.copiedBuffer(bbLen)));
+ LOG.trace("0x{} bbLen {}",
+ Long.toHexString(sessionId),
+ ByteBufUtil.hexDump(Unpooled.wrappedBuffer(bbLen)));
}
int len = bbLen.getInt();
if (LOG.isTraceEnabled()) {
- LOG.trace(Long.toHexString(sessionId)
- + " bbLen len is " + len);
+ LOG.trace("0x{} bbLen len is {}",
+ Long.toHexString(sessionId),
+ len);
}
bbLen.clear();
@@ -403,16 +486,38 @@ public class NettyServerCnxn extends ServerCnxn {
}
}
+ /**
+ * An event that triggers a change in the channel's "Auto Read" setting.
+ * Used for throttling. By using an enum we can treat the two values as
+ * singletons and compare with ==.
+ */
+ enum AutoReadEvent {
+ DISABLE,
+ ENABLE
+ }
+
+ /**
+ * Note that the netty implementation ignores the <code>waitDisableRecv</code>
+ * parameter and is always asynchronous.
+ * @param waitDisableRecv ignored by this implementation.
+ */
@Override
public void disableRecv(boolean waitDisableRecv) {
- throttled = true;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Throttling - disabling recv " + this);
+ if (throttled.compareAndSet(false, true)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Throttling - disabling recv {}", this);
+ }
+ channel.pipeline().fireUserEventTriggered(AutoReadEvent.DISABLE);
}
- ChannelFuture cf = channel.setReadable(false);
+ }
- if (waitDisableRecv) {
- cf.awaitUninterruptibly();
+ @Override
+ public void enableRecv() {
+ if (throttled.compareAndSet(true, false)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending unthrottle event {}", this);
+ }
+ channel.pipeline().fireUserEventTriggered(AutoReadEvent.ENABLE);
}
}
@@ -423,12 +528,26 @@ public class NettyServerCnxn extends ServerCnxn {
@Override
public int getInterestOps() {
- return channel.getInterestOps();
+ // This might not be 100% right, but it's only used for printing
+ // connection info in the netty implementation so it's probably ok.
+ if (channel == null || !channel.isOpen()) {
+ return 0;
+ }
+ int interestOps = 0;
+ if (!throttled.get()) {
+ interestOps |= SelectionKey.OP_READ;
+ }
+ if (!channel.isWritable()) {
+ // OP_READ means "can read", but OP_WRITE means "cannot write",
+ // it's weird.
+ interestOps |= SelectionKey.OP_WRITE;
+ }
+ return interestOps;
}
@Override
public InetSocketAddress getRemoteSocketAddress() {
- return (InetSocketAddress)channel.getRemoteAddress();
+ return (InetSocketAddress)channel.remoteAddress();
}
/** Send close connection packet to the client.
@@ -469,4 +588,9 @@ public class NettyServerCnxn extends ServerCnxn {
clientChain = Arrays.copyOf(chain, chain.length);
}
}
+
+ // For tests and NettyServerCnxnFactory only, thus package-private.
+ Channel getChannel() {
+ return channel;
+ }
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
index d3abf38..99de0e6 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
@@ -18,41 +18,6 @@
package org.apache.zookeeper.server;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.common.ClientX509Util;
-import org.apache.zookeeper.common.X509Exception;
-import org.apache.zookeeper.common.X509Exception.SSLContextException;
-import org.apache.zookeeper.server.auth.ProviderRegistry;
-import org.apache.zookeeper.server.auth.X509AuthenticationProvider;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandler.Sharable;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.WriteCompletionEvent;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLPeerUnverifiedException;
-import javax.net.ssl.SSLSession;
-import javax.net.ssl.X509KeyManager;
-import javax.net.ssl.X509TrustManager;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -61,51 +26,86 @@ import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.X509KeyManager;
+import javax.net.ssl.X509TrustManager;
-import static org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.ChannelGroupFuture;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.AttributeKey;
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.DefaultEventExecutor;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.apache.zookeeper.common.NettyUtils;
+import org.apache.zookeeper.common.X509Exception;
+import org.apache.zookeeper.common.X509Exception.SSLContextException;
+import org.apache.zookeeper.server.auth.ProviderRegistry;
+import org.apache.zookeeper.server.auth.X509AuthenticationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class NettyServerCnxnFactory extends ServerCnxnFactory {
private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxnFactory.class);
- ServerBootstrap bootstrap;
- Channel parentChannel;
- ChannelGroup allChannels = new DefaultChannelGroup("zkServerCnxns");
- Map<InetAddress, Set<NettyServerCnxn>> ipMap =
- new HashMap<InetAddress, Set<NettyServerCnxn>>( );
- InetSocketAddress localAddress;
- int maxClientCnxns = 60;
- ClientX509Util x509Util;
+ private final ServerBootstrap bootstrap;
+ private Channel parentChannel;
+ private final ChannelGroup allChannels =
+ new DefaultChannelGroup("zkServerCnxns", new DefaultEventExecutor());
+ // Access to ipMap or to any Set contained in the map needs to be
+ // protected with synchronized (ipMap) { ... }
+ private final Map<InetAddress, Set<NettyServerCnxn>> ipMap = new HashMap<>();
+ private InetSocketAddress localAddress;
+ private int maxClientCnxns = 60;
+ private final ClientX509Util x509Util;
+
+ private static final AttributeKey<NettyServerCnxn> CONNECTION_ATTRIBUTE =
+ AttributeKey.valueOf("NettyServerCnxn");
+
+ private static final AtomicReference<ByteBufAllocator> TEST_ALLOCATOR =
+ new AtomicReference<>(null);
/**
- * This is an inner class since we need to extend SimpleChannelHandler, but
+ * This is an inner class since we need to extend ChannelDuplexHandler, but
* NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner
* this class gets access to the member variables and methods.
*/
@Sharable
- class CnxnChannelHandler extends SimpleChannelHandler {
-
- @Override
- public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
- throws Exception
- {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Channel closed " + e);
- }
- allChannels.remove(ctx.getChannel());
- }
+ class CnxnChannelHandler extends ChannelDuplexHandler {
@Override
- public void channelConnected(ChannelHandlerContext ctx,
- ChannelStateEvent e) throws Exception
- {
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
- LOG.trace("Channel connected " + e);
+ LOG.trace("Channel active {}", ctx.channel());
}
- Channel channel = ctx.getChannel();
- InetAddress addr = ((InetSocketAddress) channel.getRemoteAddress())
+ final Channel channel = ctx.channel();
+ InetAddress addr = ((InetSocketAddress) channel.remoteAddress())
.getAddress();
if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) {
LOG.warn("Too many connections from {} - max is {}", addr,
@@ -116,170 +116,104 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
NettyServerCnxn cnxn = new NettyServerCnxn(channel,
zkServer, NettyServerCnxnFactory.this);
- ctx.setAttachment(cnxn);
+ ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);
if (secure) {
- SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
- ChannelFuture handshakeFuture = sslHandler.handshake();
+ SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
+ Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
} else {
- allChannels.add(ctx.getChannel());
+ allChannels.add(ctx.channel());
addCnxn(cnxn);
}
}
@Override
- public void channelDisconnected(ChannelHandlerContext ctx,
- ChannelStateEvent e) throws Exception
- {
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (LOG.isTraceEnabled()) {
- LOG.trace("Channel disconnected " + e);
+ LOG.trace("Channel inactive {}", ctx.channel());
}
- NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+ allChannels.remove(ctx.channel());
+ NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
if (cnxn != null) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Channel disconnect caused close " + e);
+ LOG.trace("Channel inactive caused close {}", cnxn);
}
cnxn.close();
}
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
- throws Exception
- {
- LOG.warn("Exception caught " + e, e.getCause());
- NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment();
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ LOG.warn("Exception caught", cause);
+ NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null);
if (cnxn != null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Closing " + cnxn);
+ LOG.debug("Closing {}", cnxn);
}
cnxn.close();
}
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception
- {
- if (LOG.isTraceEnabled()) {
- LOG.trace("message received called " + e.getMessage());
- }
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("New message " + e.toString()
- + " from " + ctx.getChannel());
- }
- NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment();
- synchronized(cnxn) {
- processMessage(e, cnxn);
+ if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) {
+ LOG.debug("Received AutoReadEvent.ENABLE");
+ NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+ // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive()
+ // or exceptionCaught() trigger, but it's unclear to me if userEventTriggered() can run
+ // after either of those. Check for null just to be safe ...
+ if (cnxn != null) {
+ cnxn.processQueuedBuffer();
+ }
+ ctx.channel().config().setAutoRead(true);
+ } else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) {
+ LOG.debug("Received AutoReadEvent.DISABLE");
+ ctx.channel().config().setAutoRead(false);
}
- } catch(Exception ex) {
- LOG.error("Unexpected exception in receive", ex);
- throw ex;
+ } finally {
+ ReferenceCountUtil.release(evt);
}
}
- private void processMessage(MessageEvent e, NettyServerCnxn cnxn) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(Long.toHexString(cnxn.sessionId) + " queuedBuffer: "
- + cnxn.queuedBuffer);
- }
-
- if (e instanceof NettyServerCnxn.ResumeMessageEvent) {
- LOG.debug("Received ResumeMessageEvent");
- if (cnxn.queuedBuffer != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("processing queue "
- + Long.toHexString(cnxn.sessionId)
- + " queuedBuffer 0x"
- + ChannelBuffers.hexDump(cnxn.queuedBuffer));
- }
- cnxn.receiveMessage(cnxn.queuedBuffer);
- if (!cnxn.queuedBuffer.readable()) {
- LOG.debug("Processed queue - no bytes remaining");
- cnxn.queuedBuffer = null;
- } else {
- LOG.debug("Processed queue - bytes remaining");
- }
- } else {
- LOG.debug("queue empty");
- }
- cnxn.channel.setReadable(true);
- } else {
- ChannelBuffer buf = (ChannelBuffer)e.getMessage();
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ try {
if (LOG.isTraceEnabled()) {
- LOG.trace(Long.toHexString(cnxn.sessionId)
- + " buf 0x"
- + ChannelBuffers.hexDump(buf));
+ LOG.trace("message received called {}", msg);
}
-
- if (cnxn.throttled) {
- LOG.debug("Received message while throttled");
- // we are throttled, so we need to queue
- if (cnxn.queuedBuffer == null) {
- LOG.debug("allocating queue");
- cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes());
- }
- cnxn.queuedBuffer.writeBytes(buf);
- if (LOG.isTraceEnabled()) {
- LOG.trace(Long.toHexString(cnxn.sessionId)
- + " queuedBuffer 0x"
- + ChannelBuffers.hexDump(cnxn.queuedBuffer));
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("New message {} from {}", msg, ctx.channel());
}
- } else {
- LOG.debug("not throttled");
- if (cnxn.queuedBuffer != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(Long.toHexString(cnxn.sessionId)
- + " queuedBuffer 0x"
- + ChannelBuffers.hexDump(cnxn.queuedBuffer));
- }
- cnxn.queuedBuffer.writeBytes(buf);
- if (LOG.isTraceEnabled()) {
- LOG.trace(Long.toHexString(cnxn.sessionId)
- + " queuedBuffer 0x"
- + ChannelBuffers.hexDump(cnxn.queuedBuffer));
- }
-
- cnxn.receiveMessage(cnxn.queuedBuffer);
- if (!cnxn.queuedBuffer.readable()) {
- LOG.debug("Processed queue - no bytes remaining");
- cnxn.queuedBuffer = null;
- } else {
- LOG.debug("Processed queue - bytes remaining");
- }
+ NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
+ if (cnxn == null) {
+ LOG.error("channelRead() on a closed or closing NettyServerCnxn");
} else {
- cnxn.receiveMessage(buf);
- if (buf.readable()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Before copy " + buf);
- }
- cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes());
- cnxn.queuedBuffer.writeBytes(buf);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Copy is " + cnxn.queuedBuffer);
- LOG.trace(Long.toHexString(cnxn.sessionId)
- + " queuedBuffer 0x"
- + ChannelBuffers.hexDump(cnxn.queuedBuffer));
- }
- }
+ cnxn.processMessage((ByteBuf) msg);
}
+ } catch (Exception ex) {
+ LOG.error("Unexpected exception in receive", ex);
+ throw ex;
}
+ } finally {
+ ReferenceCountUtil.release(msg);
}
}
@Override
- public void writeComplete(ChannelHandlerContext ctx,
- WriteCompletionEvent e) throws Exception
- {
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (LOG.isTraceEnabled()) {
- LOG.trace("write complete " + e);
+ promise.addListener((future) -> {
+ LOG.trace("write {}",
+ future.isSuccess() ? "complete" : "failed");
+ });
}
+ super.write(ctx, msg, promise);
}
- private final class CertificateVerifier
- implements ChannelFutureListener {
+ private final class CertificateVerifier implements GenericFutureListener<Future<Channel>> {
private final SslHandler sslHandler;
private final NettyServerCnxn cnxn;
@@ -291,12 +225,13 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
/**
* Only allow the connection to stay open if certificate passes auth
*/
- public void operationComplete(ChannelFuture future)
- throws SSLPeerUnverifiedException {
+ public void operationComplete(Future<Channel> future) throws SSLPeerUnverifiedException {
if (future.isSuccess()) {
- LOG.debug("Successful handshake with session 0x{}",
- Long.toHexString(cnxn.sessionId));
- SSLEngine eng = sslHandler.getEngine();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Successful handshake with session 0x{}",
+ Long.toHexString(cnxn.getSessionId()));
+ }
+ SSLEngine eng = sslHandler.engine();
SSLSession session = eng.getSession();
cnxn.setClientCertificateChain(session.getPeerCertificates());
@@ -316,16 +251,17 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
if (KeeperException.Code.OK !=
authProvider.handleAuthentication(cnxn, null)) {
LOG.error("Authentication failed for session 0x{}",
- Long.toHexString(cnxn.sessionId));
+ Long.toHexString(cnxn.getSessionId()));
cnxn.close();
return;
}
- allChannels.add(future.getChannel());
+ final Channel futureChannel = future.getNow();
+ allChannels.add(Objects.requireNonNull(futureChannel));
addCnxn(cnxn);
} else {
LOG.error("Unsuccessful handshake with session 0x{}",
- Long.toHexString(cnxn.sessionId));
+ Long.toHexString(cnxn.getSessionId()));
cnxn.close();
}
}
@@ -334,30 +270,42 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
CnxnChannelHandler channelHandler = new CnxnChannelHandler();
- NettyServerCnxnFactory() {
- bootstrap = new ServerBootstrap(
- new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool()));
- // parent channel
- bootstrap.setOption("reuseAddress", true);
- // child channels
- bootstrap.setOption("child.tcpNoDelay", true);
- /* set socket linger to off, so that socket close does not block */
- bootstrap.setOption("child.soLinger", -1);
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- @Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline p = Channels.pipeline();
- if (secure) {
- initSSL(p);
- }
- p.addLast("servercnxnfactory", channelHandler);
+ private ServerBootstrap configureBootstrapAllocator(ServerBootstrap bootstrap) {
+ ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
+ if (testAllocator != null) {
+ return bootstrap
+ .option(ChannelOption.ALLOCATOR, testAllocator)
+ .childOption(ChannelOption.ALLOCATOR, testAllocator);
+ } else {
+ return bootstrap;
+ }
+ }
- return p;
- }
- });
+ NettyServerCnxnFactory() {
x509Util = new ClientX509Util();
+
+ EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup();
+ EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup();
+ ServerBootstrap bootstrap = new ServerBootstrap()
+ .group(bossGroup, workerGroup)
+ .channel(NettyUtils.nioOrEpollServerSocketChannel())
+ // parent channel options
+ .option(ChannelOption.SO_REUSEADDR, true)
+ // child channels options
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.SO_LINGER, -1)
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ if (secure) {
+ initSSL(pipeline);
+ }
+ pipeline.addLast("servercnxnfactory", channelHandler);
+ }
+ });
+ this.bootstrap = configureBootstrapAllocator(bootstrap);
+ this.bootstrap.validate();
}
private synchronized void initSSL(ChannelPipeline p)
@@ -390,7 +338,7 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
sslEngine.setNeedClientAuth(true);
p.addLast("ssl", new SslHandler(sslEngine));
- LOG.info("SSL handler added for channel: {}", p.getChannel());
+ LOG.info("SSL handler added for channel: {}", p.channel());
}
@Override
@@ -440,7 +388,7 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
return localAddress.getPort();
}
- boolean killed;
+ private boolean killed; // use synchronized(this) to access
@Override
public void join() throws InterruptedException {
synchronized(this) {
@@ -452,16 +400,42 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
@Override
public void shutdown() {
- LOG.info("shutdown called " + localAddress);
+ synchronized (this) {
+ if (killed) {
+ LOG.info("already shutdown {}", localAddress);
+ return;
+ }
+ }
+ LOG.info("shutdown called {}", localAddress);
+
if (login != null) {
login.shutdown();
}
+
+ final EventLoopGroup bossGroup = bootstrap.config().group();
+ final EventLoopGroup workerGroup = bootstrap.config().childGroup();
// null if factory never started
if (parentChannel != null) {
- parentChannel.close().awaitUninterruptibly();
+ ChannelFuture parentCloseFuture = parentChannel.close();
+ if (bossGroup != null) {
+ parentCloseFuture.addListener(future -> {
+ bossGroup.shutdownGracefully();
+ });
+ }
closeAll();
- allChannels.close().awaitUninterruptibly();
- bootstrap.releaseExternalResources();
+ ChannelGroupFuture allChannelsCloseFuture = allChannels.close();
+ if (workerGroup != null) {
+ allChannelsCloseFuture.addListener(future -> {
+ workerGroup.shutdownGracefully();
+ });
+ }
+ } else {
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
+ }
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
+ }
}
if (zkServer != null) {
@@ -475,16 +449,23 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
@Override
public void start() {
- LOG.info("binding to port " + localAddress);
- parentChannel = bootstrap.bind(localAddress);
+ LOG.info("binding to port {}", localAddress);
+ parentChannel = bootstrap.bind(localAddress).syncUninterruptibly().channel();
+ // Port changes after bind() if the original port was 0, update
+ // localAddress to get the real port.
+ localAddress = (InetSocketAddress) parentChannel.localAddress();
+ LOG.info("bound to port " + getLocalPort());
}
public void reconfigure(InetSocketAddress addr) {
Channel oldChannel = parentChannel;
try {
LOG.info("binding to port {}", addr);
- parentChannel = bootstrap.bind(addr);
- localAddress = addr;
+ parentChannel = bootstrap.bind(addr).syncUninterruptibly().channel();
+ // Port changes after bind() if the original port was 0, update
+ // localAddress to get the real port.
+ localAddress = (InetSocketAddress) parentChannel.localAddress();
+ LOG.info("bound to port " + getLocalPort());
} catch (Exception e) {
LOG.error("Error while reconfiguring", e);
} finally {
@@ -517,21 +498,39 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
cnxns.add(cnxn);
synchronized (ipMap){
InetAddress addr =
- ((InetSocketAddress)cnxn.channel.getRemoteAddress())
- .getAddress();
+ ((InetSocketAddress)cnxn.getChannel().remoteAddress()).getAddress();
Set<NettyServerCnxn> s = ipMap.get(addr);
if (s == null) {
- s = new HashSet<NettyServerCnxn>();
+ s = new HashSet<>();
+ ipMap.put(addr, s);
}
s.add(cnxn);
- ipMap.put(addr,s);
+ }
+ }
+
+ void removeCnxnFromIpMap(NettyServerCnxn cnxn, InetAddress remoteAddress) {
+ synchronized (ipMap) {
+ Set<NettyServerCnxn> s = ipMap.get(remoteAddress);
+ if (s != null) {
+ s.remove(cnxn);
+ if (s.isEmpty()) {
+ ipMap.remove(remoteAddress);
+ }
+ } else {
+ LOG.error(
+ "Unexpected null set for remote address {} when removing cnxn {}",
+ remoteAddress,
+ cnxn);
+ }
}
}
private int getClientCnxnCount(InetAddress addr) {
- Set<NettyServerCnxn> s = ipMap.get(addr);
- if (s == null) return 0;
- return s.size();
+ synchronized (ipMap) {
+ Set<NettyServerCnxn> s = ipMap.get(addr);
+ if (s == null) return 0;
+ return s.size();
+ }
}
@Override
@@ -552,4 +551,23 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
return info;
}
+ /**
+ * Sets the test ByteBufAllocator. This allocator will be used by all
+ * future instances of this class.
+ * It is not recommended to use this method outside of testing.
+ * @param allocator the ByteBufAllocator to use for all netty buffer
+ * allocations.
+ */
+ static void setTestAllocator(ByteBufAllocator allocator) {
+ TEST_ALLOCATOR.set(allocator);
+ }
+
+ /**
+ * Clears the test ByteBufAllocator. The default allocator will be used
+ * by all future instances of this class.
+ * It is not recommended to use this method outside of testing.
+ */
+ static void clearTestAllocator() {
+ TEST_ALLOCATOR.set(null);
+ }
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UnifiedServerSocket.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UnifiedServerSocket.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UnifiedServerSocket.java
index 4802ecf..d1e3ba5 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UnifiedServerSocket.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UnifiedServerSocket.java
@@ -18,10 +18,10 @@
package org.apache.zookeeper.server.quorum;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.ssl.SslHandler;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.common.X509Util;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.handler.ssl.SslHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +61,7 @@ public class UnifiedServerSocket extends ServerSocket {
int bytesRead = prependableSocket.getInputStream().read(litmus, 0, 5);
prependableSocket.prependToInputStream(litmus);
- if (bytesRead == 5 && SslHandler.isEncrypted(ChannelBuffers.wrappedBuffer(litmus))) {
+ if (bytesRead == 5 && SslHandler.isEncrypted(Unpooled.wrappedBuffer(litmus))) {
LOG.info(getInetAddress() + " attempting to connect over ssl");
SSLSocket sslSocket;
try {
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketTest.java
index 054e1ed..0550bcf 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketTest.java
@@ -23,10 +23,23 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.test.TestByteBufAllocator;
import org.apache.zookeeper.common.ZKConfig;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
public class ClientCnxnSocketTest {
+ @Before
+ public void setUp() {
+ ClientCnxnSocketNetty.setTestAllocator(TestByteBufAllocator.getInstance());
+ }
+
+ @After
+ public void tearDown() {
+ ClientCnxnSocketNetty.clearTestAllocator();
+ TestByteBufAllocator.checkForLeaks();
+ }
@Test
public void testWhenInvalidJuteMaxBufferIsConfiguredIOExceptionIsThrown() {