You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by eo...@apache.org on 2019/12/16 21:10:02 UTC

[zookeeper] branch branch-3.6 updated: ZOOKEEPER-3651: try to fix flaky NettyServerCnxnFactoryTest

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-3.6
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/branch-3.6 by this push:
     new 66e9eda  ZOOKEEPER-3651: try to fix flaky NettyServerCnxnFactoryTest
66e9eda is described below

commit 66e9eda68539c7eda5c67af4f12003382aba8529
Author: Mate Szalay-Beko <sz...@gmail.com>
AuthorDate: Mon Dec 16 22:09:36 2019 +0100

    ZOOKEEPER-3651: try to fix flaky NettyServerCnxnFactoryTest
    
    The testOutstandingHandshakeLimit is flaky, I tried to fix it in this commit.
    - I added extra comments and did some restructuring in the code.
    - Avoiding to start unnecessary ZooKeeper servers for tests don't require it
    - Decreasing the number of client connections the test tries to initiate
    - Increasing the timeout to make sure the connections get established
    - Filtering the 'SyncConnected' events in the client watcher to make sure
    the given connection is really established before counting it
    
    I think the last two points above should fix the flakiness. I tried to run the
    test in docker, and before the fix it failed for me once in every 4-5 execution.
    After applying these changes I re-executed it 100 times without failure.
    
    If these fixes are not enough, then we can introduce some only-visible-by-test
    method to add sleep in the SSLHandshake process in the production code to
    force to have handshakes in parallel. However, it would be nice to avoid that.
    Let's hope that these fixes will be enough.
    
    Author: Mate Szalay-Beko <sz...@gmail.com>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>, Norbert Kalmar <nk...@cloudera.com>
    
    Closes #1184 from symat/ZOOKEEPER-3651
    
    (cherry picked from commit 20daae7d5fa934629e7825ed72e66ad76a94d6aa)
    Signed-off-by: Enrico Olivelli <eo...@apache.org>
---
 .../server/NettyServerCnxnFactoryTest.java         | 150 ++++++++++++++++-----
 1 file changed, 114 insertions(+), 36 deletions(-)

diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnFactoryTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnFactoryTest.java
index afb97b1..76136c4 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnFactoryTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnFactoryTest.java
@@ -43,24 +43,33 @@ public class NettyServerCnxnFactoryTest extends ClientBase {
     private static final Logger LOG = LoggerFactory
             .getLogger(NettyServerCnxnFactoryTest.class);
 
-    final LinkedBlockingQueue<ZooKeeper> zks = new LinkedBlockingQueue<ZooKeeper>();
+    ClientX509Util x509Util;
+    final LinkedBlockingQueue<ZooKeeper> zooKeeperClients = new LinkedBlockingQueue<>();
+
 
     @Override
     public void setUp() throws Exception {
         System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
                 "org.apache.zookeeper.server.NettyServerCnxnFactory");
-        super.setUp();
+
+        // by default, we don't start any ZooKeeper server, as not all the tests are needing it.
     }
 
     @Override
     public void tearDown() throws Exception {
-        System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
 
-        // clean up
-        for (ZooKeeper zk : zks) {
+        System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
+        if (x509Util != null) {
+            SSLAuthTest.clearSecureSetting(x509Util);
+        }
+        for (ZooKeeper zk : zooKeeperClients) {
             zk.close();
         }
-        super.tearDown();
+
+        //stopping the server only if it was started
+        if (serverFactory != null) {
+            super.tearDown();
+        }
     }
 
     @Test
@@ -96,63 +105,132 @@ public class NettyServerCnxnFactoryTest extends ClientBase {
         Assert.assertTrue(factory.getParentChannel().isActive());
     }
 
+    /*
+     * In this test we are flooding the server with SSL connections, and expecting that not
+     * all the connection will succeed at once. Some of the connections should be closed,
+     * as there is a maximum number of parallel SSL handshake the server is willing to do
+     * for security reasons.
+     */
     @Test
     public void testOutstandingHandshakeLimit() throws Exception {
 
+        // setting up SSL params, but disable some debug logs
+        x509Util = SSLAuthTest.setUpSecure();
+        System.clearProperty("javax.net.debug");
+
+        // starting a single server (it will be closed in the tearDown)
+        setUpWithServerId(1);
+
+        // initializing the statistics
         SimpleCounter tlsHandshakeExceeded = (SimpleCounter) ServerMetrics.getMetrics().TLS_HANDSHAKE_EXCEEDED;
         tlsHandshakeExceeded.reset();
         Assert.assertEquals(tlsHandshakeExceeded.get(), 0);
 
-        ClientX509Util x509Util = SSLAuthTest.setUpSecure();
+        // setting the HandshakeLimit to 3, so only 3 SSL handshakes can happen in parallel
         NettyServerCnxnFactory factory = (NettyServerCnxnFactory) serverFactory;
         factory.setSecure(true);
-        factory.setOutstandingHandshakeLimit(10);
+        factory.setOutstandingHandshakeLimit(3);
 
+        // starting the threads that will try to connect to the server
+        // we will have 3 threads, each of them establishing 3 connections
         int threadNum = 3;
-        int cnxnPerThread = 10;
-        Thread[] cnxnWorker = new Thread[threadNum];
-
+        int cnxnPerThread = 3;
+        int cnxnLimit = threadNum * cnxnPerThread;
         AtomicInteger cnxnCreated = new AtomicInteger(0);
         CountDownLatch latch = new CountDownLatch(1);
-
+        Thread[] cnxnWorker = new Thread[threadNum];
         for (int i = 0; i < cnxnWorker.length; i++) {
-            cnxnWorker[i] = new Thread() {
-                @Override
-                public void run() {
-                    for (int i = 0; i < cnxnPerThread; i++) {
-                        try {
-                            zks.add(new ZooKeeper(hostPort, 3000, new Watcher() {
-                                @Override
-                                public void process(WatchedEvent event) {
-                                    int created = cnxnCreated.addAndGet(1);
-                                    if (created == threadNum * cnxnPerThread) {
-                                        latch.countDown();
-                                    }
-                                }
-                            }));
-                        } catch (Exception e) {
-                            LOG.info("Error while creating zk client", e);
-                        }
-                    }
-                }
-            };
+            cnxnWorker[i] = new ClientConnectionGenerator(i, cnxnPerThread, cnxnCreated, cnxnLimit, latch, zooKeeperClients);
             cnxnWorker[i].start();
         }
 
-        Assert.assertThat(latch.await(3, TimeUnit.SECONDS), Matchers.is(true));
-        LOG.info("created {} connections", threadNum * cnxnPerThread);
+        // we might need to wait potentially for a longer time for all the connection to get established,
+        // as the ZooKeeper Server will close some of the connections and the clients will have to re-try
+        boolean allConnectionsCreatedInTime = latch.await(30, TimeUnit.SECONDS);
+        int actualConnections = cnxnCreated.get();
+        LOG.info("created {} connections", actualConnections);
+        if (!allConnectionsCreatedInTime) {
+          Assert.fail(String.format("Only %d out of %d connections created!", actualConnections, cnxnLimit));
+        }
 
-        // Assert throttling not 0
+        // Assert the server refused some of the connections because the handshake limit was reached
+        // (throttling should be greater than 0)
         long handshakeThrottledNum = tlsHandshakeExceeded.get();
         LOG.info("TLS_HANDSHAKE_EXCEEDED: {}", handshakeThrottledNum);
         Assert.assertThat("The number of handshake throttled should be "
                 + "greater than 0", handshakeThrottledNum, Matchers.greaterThan(0L));
 
-        // Assert there is no outstanding handshake anymore
+        // Assert there is no outstanding handshake anymore, all the clients connected in the end
         int outstandingHandshakeNum = factory.getOutstandingHandshakeNum();
         LOG.info("outstanding handshake is {}", outstandingHandshakeNum);
         Assert.assertThat("The outstanding handshake number should be 0 "
                 + "after all cnxns established", outstandingHandshakeNum, Matchers.is(0));
+    }
 
+
+    private final class ClientConnectionWatcher implements Watcher {
+
+        private final AtomicInteger cnxnCreated;
+        private final int cnxnLimit;
+        private final int cnxnThreadId;
+        private final int cnxnId;
+        private final CountDownLatch latch;
+
+        public ClientConnectionWatcher(AtomicInteger cnxnCreated, int cnxnLimit, int cnxnThreadId,
+                                       int cnxnId, CountDownLatch latch) {
+            this.cnxnCreated = cnxnCreated;
+            this.cnxnLimit = cnxnLimit;
+            this.cnxnThreadId = cnxnThreadId;
+            this.cnxnId = cnxnId;
+            this.latch = latch;
+        }
+
+        @Override
+        public void process(WatchedEvent event) {
+            LOG.info(String.format("WATCHER [thread: %d, cnx:%d] - new event: %s", cnxnThreadId, cnxnId, event.toString()));
+            if (event.getState() == Event.KeeperState.SyncConnected) {
+              int created = cnxnCreated.addAndGet(1);
+              if (created == cnxnLimit) {
+                latch.countDown();
+              }
+            }
+        }
     }
+
+
+    private final class ClientConnectionGenerator extends Thread {
+
+        private final int cnxnThreadId;
+        private final int cnxnPerThread;
+        private final AtomicInteger cnxnCreated;
+        private final int cnxnLimit;
+        private final CountDownLatch latch;
+        private final LinkedBlockingQueue<ZooKeeper> zks;
+
+        private ClientConnectionGenerator(int cnxnThreadId, int cnxnPerThread,
+                                          AtomicInteger cnxnCreated, int cnxnLimit,
+                                          CountDownLatch latch,
+                                          LinkedBlockingQueue<ZooKeeper> zks) {
+            this.cnxnThreadId = cnxnThreadId;
+            this.cnxnPerThread = cnxnPerThread;
+            this.cnxnCreated = cnxnCreated;
+            this.cnxnLimit = cnxnLimit;
+            this.latch = latch;
+            this.zks = zks;
+        }
+
+        @Override
+        public void run() {
+
+            for (int j = 0; j < cnxnPerThread; j++) {
+                try {
+                    zks.add(new ZooKeeper(hostPort, 30000,
+                                          new ClientConnectionWatcher(cnxnCreated, cnxnLimit, cnxnThreadId, j, latch)));
+                } catch (Exception e) {
+                    LOG.info("Error while creating zk client", e);
+                }
+            }
+        }
+    }
+
 }