You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by nk...@apache.org on 2019/06/24 09:05:35 UTC

[zookeeper] branch master updated: ZOOKEEPER-3365: Use Concurrent HashMap with Counter in NettyServerCnxnFactory

This is an automated email from the ASF dual-hosted git repository.

nkalmar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 5627cd4  ZOOKEEPER-3365: Use Concurrent HashMap with Counter in NettyServerCnxnFactory
5627cd4 is described below

commit 5627cd4a5a35b31b5b8143eec2a18940d914a7d4
Author: Beluga Behr <da...@gmail.com>
AuthorDate: Mon Jun 24 11:05:25 2019 +0200

    ZOOKEEPER-3365: Use Concurrent HashMap with Counter in NettyServerCnxnFactory
    
    This PR includes improvements related with the Java 8 concurrent package to provide better locking control and less code.
    
    Author: Beluga Behr <da...@gmail.com>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>, Norbert Kalmar <nk...@apache.org>, Andor Molnar <an...@apache.org>
    
    Closes #912 from BELUGABEHR/ZOOKEEPER-3365-Counter
---
 .../zookeeper/server/NettyServerCnxnFactory.java   | 62 +++++++++-------------
 .../zookeeper/server/NettyServerCnxnTest.java      | 19 +++++++
 .../java/org/apache/zookeeper/test/ClientBase.java |  5 ++
 3 files changed, 50 insertions(+), 36 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
index f91a90d..549f7dc 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
@@ -23,12 +23,13 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.net.ssl.SSLContext;
@@ -99,9 +100,7 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
     private Channel parentChannel;
     private final ChannelGroup allChannels =
             new DefaultChannelGroup("zkServerCnxns", new DefaultEventExecutor());
-    // Access to ipMap or to any Set contained in the map needs to be
-    // protected with synchronized (ipMap) { ... }
-    private final Map<InetAddress, Set<NettyServerCnxn>> ipMap = new HashMap<>();
+    private final Map<InetAddress, AtomicInteger> ipMap = new ConcurrentHashMap<>();
     private InetSocketAddress localAddress;
     private int maxClientCnxns = 60;
     int listenBacklog = -1;
@@ -635,44 +634,35 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
         return localAddress;
     }
 
-    private void addCnxn(NettyServerCnxn cnxn) {
+    private void addCnxn(final NettyServerCnxn cnxn) {
         cnxns.add(cnxn);
-        synchronized (ipMap){
-            InetAddress addr =
-                ((InetSocketAddress)cnxn.getChannel().remoteAddress()).getAddress();
-            Set<NettyServerCnxn> s = ipMap.get(addr);
-            if (s == null) {
-                s = new HashSet<>();
-                ipMap.put(addr, s);
+        InetAddress addr =
+            ((InetSocketAddress) cnxn.getChannel().remoteAddress()).getAddress();
+
+        ipMap.compute(addr, (a, cnxnCount) -> {
+            if (cnxnCount == null) {
+              cnxnCount = new AtomicInteger();
             }
-            s.add(cnxn);
-        }
+            cnxnCount.incrementAndGet();
+            return cnxnCount;
+        });
     }
-
+  
     void removeCnxnFromIpMap(NettyServerCnxn cnxn, InetAddress remoteAddress) {
-        synchronized (ipMap) {
-            Set<NettyServerCnxn> s = ipMap.get(remoteAddress);
-            if (s != null) {
-                s.remove(cnxn);
-                if (s.isEmpty()) {
-                    ipMap.remove(remoteAddress);
-                }
-                return;
-            }
-        }
-        // Fallthrough and log errors outside the synchronized block
-        LOG.error(
-                "Unexpected null set for remote address {} when removing cnxn {}",
-                remoteAddress,
-                cnxn);
+        ipMap.compute(remoteAddress, (addr, cnxnCount) -> {
+        if (cnxnCount == null) {
+            LOG.error("Unexpected remote address {} when removing cnxn {}",
+                remoteAddress, cnxn);
+            return null;
+        }
+        final int newValue = cnxnCount.decrementAndGet();
+        return newValue == 0 ? null : cnxnCount;
+      });
     }
 
-    private int getClientCnxnCount(InetAddress addr) {
-        synchronized (ipMap) {
-            Set<NettyServerCnxn> s = ipMap.get(addr);
-            if (s == null) return 0;
-            return s.size();
-        }
+    private int getClientCnxnCount(final InetAddress addr) {
+      final AtomicInteger count = ipMap.get(addr);
+      return count == null ? 0 : count.get();
     }
 
     @Override
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
index bbead60..068cb29 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.ProtocolException;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.TimeUnit;
 
@@ -53,6 +54,8 @@ public class NettyServerCnxnTest extends ClientBase {
         System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
                 "org.apache.zookeeper.server.NettyServerCnxnFactory");
         NettyServerCnxnFactory.setTestAllocator(TestByteBufAllocator.getInstance());
+        super.maxCnxns = 1;
+        super.exceptionOnFailedConnect = true;
         super.setUp();
     }
 
@@ -110,6 +113,22 @@ public class NettyServerCnxnTest extends ClientBase {
         }
     }
 
+    /**
+     * In the {@link #setUp()} routine, the maximum number of connections per IP
+     * is set to 1. This tests that if more than one connection is attempted, the
+     * connection fails.
+     */
+    @Test(timeout = 40000, expected = ProtocolException.class)
+    public void testMaxConnectionPerIpSurpased() throws Exception {
+        Assert.assertTrue(
+                "Did not instantiate ServerCnxnFactory with NettyServerCnxnFactory!",
+                serverFactory instanceof NettyServerCnxnFactory);
+
+        try (final ZooKeeper zk1 = createClient();
+            final ZooKeeper zk2 = createClient();) {
+        }
+    }
+
     @Test
     public void testClientResponseStatsUpdate() throws IOException, InterruptedException, KeeperException {
         try (ZooKeeper zk = createClient()) {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java
index 206a4a2..cc223cb 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java
@@ -26,6 +26,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.ConnectException;
+import java.net.ProtocolException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -76,6 +77,7 @@ public abstract class ClientBase extends ZKTestCase {
     protected int maxCnxns = 0;
     protected ServerCnxnFactory serverFactory = null;
     protected File tmpDir = null;
+    protected boolean exceptionOnFailedConnect = false;
 
     long initialFdCount;
 
@@ -228,6 +230,9 @@ public abstract class ClientBase extends ZKTestCase {
         TestableZooKeeper zk = new TestableZooKeeper(hp, timeout, watcher);
         if (!watcher.clientConnected.await(timeout, TimeUnit.MILLISECONDS))
         {
+            if (exceptionOnFailedConnect) {
+              throw new ProtocolException("Unable to connect to server");
+            }
             Assert.fail("Unable to connect to server");
         }
         synchronized(this) {