You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by nk...@apache.org on 2019/06/24 09:05:35 UTC
[zookeeper] branch master updated: ZOOKEEPER-3365: Use Concurrent
HashMap with Counter in NettyServerCnxnFactory
This is an automated email from the ASF dual-hosted git repository.
nkalmar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 5627cd4 ZOOKEEPER-3365: Use Concurrent HashMap with Counter in NettyServerCnxnFactory
5627cd4 is described below
commit 5627cd4a5a35b31b5b8143eec2a18940d914a7d4
Author: Beluga Behr <da...@gmail.com>
AuthorDate: Mon Jun 24 11:05:25 2019 +0200
ZOOKEEPER-3365: Use Concurrent HashMap with Counter in NettyServerCnxnFactory
This PR includes improvements related with the Java 8 concurrent package to provide better locking control and less code.
Author: Beluga Behr <da...@gmail.com>
Reviewers: Enrico Olivelli <eo...@apache.org>, Norbert Kalmar <nk...@apache.org>, Andor Molnar <an...@apache.org>
Closes #912 from BELUGABEHR/ZOOKEEPER-3365-Counter
---
.../zookeeper/server/NettyServerCnxnFactory.java | 62 +++++++++-------------
.../zookeeper/server/NettyServerCnxnTest.java | 19 +++++++
.../java/org/apache/zookeeper/test/ClientBase.java | 5 ++
3 files changed, 50 insertions(+), 36 deletions(-)
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
index f91a90d..549f7dc 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
@@ -23,12 +23,13 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLContext;
@@ -99,9 +100,7 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
private Channel parentChannel;
private final ChannelGroup allChannels =
new DefaultChannelGroup("zkServerCnxns", new DefaultEventExecutor());
- // Access to ipMap or to any Set contained in the map needs to be
- // protected with synchronized (ipMap) { ... }
- private final Map<InetAddress, Set<NettyServerCnxn>> ipMap = new HashMap<>();
+ private final Map<InetAddress, AtomicInteger> ipMap = new ConcurrentHashMap<>();
private InetSocketAddress localAddress;
private int maxClientCnxns = 60;
int listenBacklog = -1;
@@ -635,44 +634,35 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
return localAddress;
}
- private void addCnxn(NettyServerCnxn cnxn) {
+ private void addCnxn(final NettyServerCnxn cnxn) {
cnxns.add(cnxn);
- synchronized (ipMap){
- InetAddress addr =
- ((InetSocketAddress)cnxn.getChannel().remoteAddress()).getAddress();
- Set<NettyServerCnxn> s = ipMap.get(addr);
- if (s == null) {
- s = new HashSet<>();
- ipMap.put(addr, s);
+ InetAddress addr =
+ ((InetSocketAddress) cnxn.getChannel().remoteAddress()).getAddress();
+
+ ipMap.compute(addr, (a, cnxnCount) -> {
+ if (cnxnCount == null) {
+ cnxnCount = new AtomicInteger();
}
- s.add(cnxn);
- }
+ cnxnCount.incrementAndGet();
+ return cnxnCount;
+ });
}
-
+
void removeCnxnFromIpMap(NettyServerCnxn cnxn, InetAddress remoteAddress) {
- synchronized (ipMap) {
- Set<NettyServerCnxn> s = ipMap.get(remoteAddress);
- if (s != null) {
- s.remove(cnxn);
- if (s.isEmpty()) {
- ipMap.remove(remoteAddress);
- }
- return;
- }
- }
- // Fallthrough and log errors outside the synchronized block
- LOG.error(
- "Unexpected null set for remote address {} when removing cnxn {}",
- remoteAddress,
- cnxn);
+ ipMap.compute(remoteAddress, (addr, cnxnCount) -> {
+ if (cnxnCount == null) {
+ LOG.error("Unexpected remote address {} when removing cnxn {}",
+ remoteAddress, cnxn);
+ return null;
+ }
+ final int newValue = cnxnCount.decrementAndGet();
+ return newValue == 0 ? null : cnxnCount;
+ });
}
- private int getClientCnxnCount(InetAddress addr) {
- synchronized (ipMap) {
- Set<NettyServerCnxn> s = ipMap.get(addr);
- if (s == null) return 0;
- return s.size();
- }
+ private int getClientCnxnCount(final InetAddress addr) {
+ final AtomicInteger count = ipMap.get(addr);
+ return count == null ? 0 : count.get();
}
@Override
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
index bbead60..068cb29 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.net.ProtocolException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
@@ -53,6 +54,8 @@ public class NettyServerCnxnTest extends ClientBase {
System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
"org.apache.zookeeper.server.NettyServerCnxnFactory");
NettyServerCnxnFactory.setTestAllocator(TestByteBufAllocator.getInstance());
+ super.maxCnxns = 1;
+ super.exceptionOnFailedConnect = true;
super.setUp();
}
@@ -110,6 +113,22 @@ public class NettyServerCnxnTest extends ClientBase {
}
}
+ /**
+ * In the {@link #setUp()} routine, the maximum number of connections per IP
+ * is set to 1. This tests that if more than one connection is attempted, the
+ * connection fails.
+ */
+ @Test(timeout = 40000, expected = ProtocolException.class)
+ public void testMaxConnectionPerIpSurpased() throws Exception {
+ Assert.assertTrue(
+ "Did not instantiate ServerCnxnFactory with NettyServerCnxnFactory!",
+ serverFactory instanceof NettyServerCnxnFactory);
+
+ try (final ZooKeeper zk1 = createClient();
+ final ZooKeeper zk2 = createClient();) {
+ }
+ }
+
@Test
public void testClientResponseStatsUpdate() throws IOException, InterruptedException, KeeperException {
try (ZooKeeper zk = createClient()) {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java
index 206a4a2..cc223cb 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java
@@ -26,6 +26,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.ConnectException;
+import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
@@ -76,6 +77,7 @@ public abstract class ClientBase extends ZKTestCase {
protected int maxCnxns = 0;
protected ServerCnxnFactory serverFactory = null;
protected File tmpDir = null;
+ protected boolean exceptionOnFailedConnect = false;
long initialFdCount;
@@ -228,6 +230,9 @@ public abstract class ClientBase extends ZKTestCase {
TestableZooKeeper zk = new TestableZooKeeper(hp, timeout, watcher);
if (!watcher.clientConnected.await(timeout, TimeUnit.MILLISECONDS))
{
+ if (exceptionOnFailedConnect) {
+ throw new ProtocolException("Unable to connect to server");
+ }
Assert.fail("Unable to connect to server");
}
synchronized(this) {