You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/10/31 20:55:51 UTC

[geode] 01/02: GEODE-3637: Committing tests in order not to loose progress

This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch feature/GEODE-3637
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 55fad5e725b73630d5ccb88ca66917c554c31334
Author: kohlmu-pivotal <uk...@pivotal.io>
AuthorDate: Fri Oct 27 14:27:30 2017 -0700

    GEODE-3637: Committing tests in order not to loose progress
---
 .../internal/cache/tier/sockets/AcceptorImpl.java  | 364 ++++++++++++---------
 .../cache/tier/sockets/ServerConnection.java       |  14 +-
 .../geode/internal/AcceptorImplDUnitTest.java      | 267 +++++++++++++++
 .../apache/geode/internal/SSLConfigJUnitTest.java  |   5 +-
 4 files changed, 478 insertions(+), 172 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index ad910bd..721874f 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -95,7 +95,6 @@ import org.apache.geode.internal.util.ArrayUtils;
 /**
  * Implements the acceptor thread on the bridge server. Accepts connections from the edge and starts
  * up threads to process requests from these.
- *
  * @since GemFire 2.0.2
  */
 @SuppressWarnings("deprecation")
@@ -115,6 +114,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   private final ThreadPoolExecutor hsPool;
 
   /**
+   * A pool used to process client-queue-initializations.
+   */
+  private final ThreadPoolExecutor clientQueueInitPool;
+
+  /**
    * The port on which this acceptor listens for client connections
    */
   private final int localPort;
@@ -268,7 +272,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * The ip address or host name this acceptor is to bind to; <code>null</code> or "" indicates it
    * will listen on all local addresses.
-   *
    * @since GemFire 5.7
    */
   private final String bindHostName;
@@ -308,14 +311,13 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * Initializes this acceptor thread to listen for connections on the given port.
-   *
    * @param port The port on which this acceptor listens for connections. If <code>0</code>, a
-   *        random port will be chosen.
+   * random port will be chosen.
    * @param bindHostName The ip address or host name this acceptor listens on for connections. If
-   *        <code>null</code> or "" then all local addresses are used
+   * <code>null</code> or "" then all local addresses are used
    * @param socketBufferSize The buffer size for server-side sockets
    * @param maximumTimeBetweenPings The maximum time between client pings. This value is used by the
-   *        <code>ClientHealthMonitor</code> to monitor the health of this server's clients.
+   * <code>ClientHealthMonitor</code> to monitor the health of this server's clients.
    * @param internalCache The GemFire cache whose contents is served to clients
    * @param maxConnections the maximum number of connections allowed in the server pool
    * @param maxThreads the maximum number of threads allowed in the server pool
@@ -324,11 +326,14 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    * @since GemFire 5.7
    */
   public AcceptorImpl(int port, String bindHostName, boolean notifyBySubscription,
-      int socketBufferSize, int maximumTimeBetweenPings, InternalCache internalCache,
-      int maxConnections, int maxThreads, int maximumMessageCount, int messageTimeToLive,
-      ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver,
-      List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay,
-      ServerConnectionFactory serverConnectionFactory) throws IOException {
+                      int socketBufferSize, int maximumTimeBetweenPings,
+                      InternalCache internalCache,
+                      int maxConnections, int maxThreads, int maximumMessageCount,
+                      int messageTimeToLive,
+                      ConnectionListener listener, List overflowAttributesList,
+                      boolean isGatewayReceiver,
+                      List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay,
+                      ServerConnectionFactory serverConnectionFactory) throws IOException {
     this.securityService = internalCache.getSecurityService();
     this.bindHostName = calcBindHostName(internalCache, bindHostName);
     this.connectionListener = listener == null ? new ConnectionListenerAdapter() : listener;
@@ -440,7 +445,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
         // fix for bug 36617. If BindException is thrown, retry after
         // sleeping. The server may have been stopped and then
         // immediately restarted, which sometimes results in a bind exception
-        for (;;) {
+        for (; ; ) {
           try {
             this.serverSock.bind(new InetSocketAddress(getBindAddress(), port), backLog);
             break;
@@ -469,7 +474,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
         // fix for bug 36617. If BindException is thrown, retry after
         // sleeping. The server may have been stopped and then
         // immediately restarted, which sometimes results in a bind exception
-        for (;;) {
+        for (; ; ) {
           try {
             this.serverSock = this.socketCreator.createServerSocket(port, backLog, getBindAddress(),
                 this.gatewayTransportFilters, socketBufferSize);
@@ -514,7 +519,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       String sockName = getServerName();
       logger.info(LocalizedMessage.create(
           LocalizedStrings.AcceptorImpl_CACHE_SERVER_CONNECTION_LISTENER_BOUND_TO_ADDRESS_0_WITH_BACKLOG_1,
-          new Object[] {sockName, Integer.valueOf(backLog)}));
+          new Object[]{sockName, Integer.valueOf(backLog)}));
       if (isGatewayReceiver) {
         this.stats = GatewayReceiverStats.createGatewayReceiverStats(sockName);
       } else {
@@ -534,103 +539,130 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings,
         this.clientNotifier.getStats());
 
-    {
-      ThreadPoolExecutor tmp_pool = null;
-      String gName = "ServerConnection "
-          // + serverSock.getInetAddress()
-          + "on port " + this.localPort;
-      final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
-
-      ThreadFactory socketThreadFactory = new ThreadFactory() {
-        int connNum = -1;
-
-        public Thread newThread(final Runnable command) {
-          int tnum;
-          synchronized (this) {
-            tnum = ++connNum;
+    pool = initializeServerConnectionThreadPool();
+    hsPool = initializeHandshakerThreadPool();
+    clientQueueInitPool = initializeClientQueueInitializerThreadPool();
+
+    isAuthenticationRequired = this.securityService.isClientSecurityRequired();
+
+    isIntegratedSecurity = this.securityService.isIntegratedSecurity();
+
+    String postAuthzFactoryName =
+        this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
+
+    isPostAuthzCallbackPresent =
+        (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false;
+  }
+
+  private ThreadPoolExecutor initializeHandshakerThreadPool() throws IOException {
+    String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort;
+    final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+
+    ThreadFactory socketThreadFactory = new ThreadFactory() {
+      AtomicInteger connNum = new AtomicInteger(-1);
+
+      @Override
+      public Thread newThread(Runnable command) {
+        String threadName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
+        getStats().incAcceptThreadsCreated();
+        return new Thread(socketThreadGroup, command, threadName);
+      }
+    };
+    try {
+      final BlockingQueue blockingQueue = new SynchronousQueue();
+      final RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
+          try {
+            blockingQueue.put(r);
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt(); // preserve the state
+            throw new RejectedExecutionException(
+                LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex);
           }
-          String tName = socketThreadGroup.getName() + " Thread " + tnum;
-          getStats().incConnectionThreadsCreated();
-          Runnable r = new Runnable() {
-            public void run() {
-              try {
-                command.run();
-              } catch (CancelException e) { // bug 39463
-                // ignore
-              } finally {
-                ConnectionTable.releaseThreadsSockets();
-              }
-            }
-          };
-          return new Thread(socketThreadGroup, r, tName);
         }
       };
-      try {
-        if (isSelector()) {
-          tmp_pool = new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads,
-              getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE);
-        } else {
-          tmp_pool = new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L,
-              TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory);
-        }
-      } catch (IllegalArgumentException poolInitException) {
-        this.stats.close();
-        this.serverSock.close();
-        throw poolInitException;
-      }
-      this.pool = tmp_pool;
+      return new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, blockingQueue,
+          socketThreadFactory, rejectedExecutionHandler);
+    } catch (IllegalArgumentException poolInitException) {
+      this.stats.close();
+      this.serverSock.close();
+      this.pool.shutdownNow();
+      throw poolInitException;
     }
-    {
-      ThreadPoolExecutor tmp_hsPool = null;
-      String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort;
-      final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+  }
 
-      ThreadFactory socketThreadFactory = new ThreadFactory() {
-        int connNum = -1;
+  private ThreadPoolExecutor initializeClientQueueInitializerThreadPool() throws IOException {
+    final ThreadGroup clientQueueThreadGroup = LoggingThreadGroup.createThreadGroup(
+        "Client Queue Initialization ", logger);
 
-        public Thread newThread(Runnable command) {
-          int tnum;
-          synchronized (this) {
-            tnum = ++connNum;
-          }
-          String tName = socketThreadGroup.getName() + " Thread " + tnum;
-          getStats().incAcceptThreadsCreated();
-          return new Thread(socketThreadGroup, command, tName);
-        }
-      };
-      try {
-        final BlockingQueue bq = new SynchronousQueue();
-        final RejectedExecutionHandler reh = new RejectedExecutionHandler() {
-          public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
+    ThreadFactory clientQueueThreadFactory = new ThreadFactory() {
+      AtomicInteger connNum = new AtomicInteger(-1);
+
+      @Override
+      public Thread newThread(final Runnable command) {
+        String
+            threadName =
+            clientQueueThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
+        Runnable runnable = new Runnable() {
+          public void run() {
             try {
-              bq.put(r);
-            } catch (InterruptedException ex) {
-              Thread.currentThread().interrupt(); // preserve the state
-              throw new RejectedExecutionException(
-                  LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex);
+              command.run();
+            } catch (CancelException e) {
+              logger.debug("Client Queue Initialization was canceled.", e);
             }
           }
         };
-        tmp_hsPool = new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, bq,
-            socketThreadFactory, reh);
-      } catch (IllegalArgumentException poolInitException) {
-        this.stats.close();
-        this.serverSock.close();
-        this.pool.shutdownNow();
-        throw poolInitException;
+        return new Thread(clientQueueThreadGroup, runnable, threadName);
       }
-      this.hsPool = tmp_hsPool;
+    };
+    try {
+      return new ThreadPoolExecutor(1, 16, 60,
+          TimeUnit.SECONDS, new SynchronousQueue(), clientQueueThreadFactory);
+    } catch (IllegalArgumentException poolInitException) {
+      throw poolInitException;
     }
+  }
 
-    isAuthenticationRequired = this.securityService.isClientSecurityRequired();
-
-    isIntegratedSecurity = this.securityService.isIntegratedSecurity();
-
-    String postAuthzFactoryName =
-        this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
-
-    isPostAuthzCallbackPresent =
-        (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false;
+  private ThreadPoolExecutor initializeServerConnectionThreadPool() throws IOException {
+    String gName = "ServerConnection "
+        // + serverSock.getInetAddress()
+        + "on port " + this.localPort;
+    final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+
+    ThreadFactory socketThreadFactory = new ThreadFactory() {
+      AtomicInteger connNum = new AtomicInteger(-1);
+
+      @Override
+      public Thread newThread(final Runnable command) {
+        String tName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
+        getStats().incConnectionThreadsCreated();
+        Runnable r = new Runnable() {
+          public void run() {
+            try {
+              command.run();
+            } catch (CancelException e) { // bug 39463
+              // ignore
+            } finally {
+              ConnectionTable.releaseThreadsSockets();
+            }
+          }
+        };
+        return new Thread(socketThreadGroup, r, tName);
+      }
+    };
+    try {
+      if (isSelector()) {
+        return new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads,
+            getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE);
+      } else {
+        return new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L,
+            TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory);
+      }
+    } catch (IllegalArgumentException poolInitException) {
+      this.stats.close();
+      this.serverSock.close();
+      throw poolInitException;
+    }
   }
 
   public long getAcceptorId() {
@@ -651,7 +683,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * This system property is only used if max-threads == 0. This is for 5.0.2 backwards
    * compatibility.
-   *
    * @deprecated since 5.1 use cache-server max-threads instead
    */
   @Deprecated
@@ -660,7 +691,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * This system property is only used if max-threads == 0. This is for 5.0.2 backwards
    * compatibility.
-   *
    * @deprecated since 5.1 use cache-server max-threads instead
    */
   @Deprecated
@@ -798,12 +828,12 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * Ensure that the CachedRegionHelper and ServerConnection classes get loaded.
-   *
    * @see SystemFailure#loadEmergencyClasses()
    */
   public static void loadEmergencyClasses() {
-    if (emergencyClassesLoaded)
+    if (emergencyClassesLoaded) {
       return;
+    }
     emergencyClassesLoaded = true;
     CachedRegionHelper.loadEmergencyClasses();
     ServerConnection.loadEmergencyClasses();
@@ -870,8 +900,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   private Selector tmpSel;
 
   private void checkForStuckKeys() {
-    if (!WORKAROUND_SELECTOR_BUG)
+    if (!WORKAROUND_SELECTOR_BUG) {
       return;
+    }
     if (tmpSel == null) {
       try {
         tmpSel = Selector.open();
@@ -887,8 +918,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     while (it.hasNext()) {
       SelectionKey sk = (SelectionKey) it.next();
       ServerConnection sc = (ServerConnection) sk.attachment();
-      if (sc == null)
+      if (sc == null) {
         continue;
+      }
       try {
         sk.cancel();
         this.selector.selectNow(); // clear the cancelled key
@@ -1040,40 +1072,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
           break;
         }
         if (events == 0) {
-          // zeroEventsCount++;
-          // if (zeroEventsCount > 0) {
-          // zeroEventsCount = 0;
           checkForStuckKeys();
-
-          // try {
-          // this.selector.close(); // this selector is sick!
-          // } catch (IOException ignore) {
-          // }
-          // this.selector = Selector.open();
-          // {
-          // Iterator it = selectorRegistrations.iterator();
-          // while (it.hasNext()) {
-          // ServerConnection sc = (ServerConnection)it.next();
-          // sc.registerWithSelector2(this.selector);
-          // }
-          // }
-          // }
-          // ArrayList al = new ArrayList();
-          // Iterator keysIt = this.selector.keys().iterator();
-          // while (keysIt.hasNext()) {
-          // SelectionKey sk = (SelectionKey)keysIt.next();
-          // al.add(sk.attachment());
-          // sk.cancel();
-          // }
-          // events = this.selector.selectNow();
-          // Iterator alIt = al.iterator();
-          // while (alIt.hasNext()) {
-          // ServerConnection sc = (ServerConnection)alIt.next();
-          // sc.registerWithSelector2(this.selector);
-          // }
-          // events = this.selector.select();
-          // } else {
-          // zeroEventsCount = 0;
         }
         while (events > 0) {
           int cancelCount = 0;
@@ -1130,16 +1129,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
                   logger.warn(
                       LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED, rejected));
                 }
-                // } else if (key.isValid() && key.isConnectable()) {
-                // logger.info("DEBUG isConnectable and isValid key=" + key);
-                // finishCon(sc);
               } else {
                 finishCon(sc);
                 if (key.isValid()) {
                   logger.warn(LocalizedMessage.create(
                       LocalizedStrings.AcceptorImpl_IGNORING_EVENT_ON_SELECTOR_KEY__0, key));
-                  // } else {
-                  // logger.info("DEBUG !isValid key=" + key);
                 }
               }
             } catch (CancelledKeyException ex) { // fix for bug 37739
@@ -1195,7 +1189,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * The work loop of this acceptor
-   *
    * @see #accept
    */
   public void run() {
@@ -1236,8 +1229,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   }
 
   /**
-   * {@linkplain ServerSocket#accept Listens}for a client to connect and then creates a new
-   * {@link ServerConnection}to handle messages from that client.
+   * {@linkplain ServerSocket#accept Listens}for a client to connect and then creates a new {@link
+   * ServerConnection}to handle messages from that client.
    */
   @Override
   public void accept() {
@@ -1325,7 +1318,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    * blocking is good because it will throttle the rate at which we create new connections.
    */
   private void handOffNewClientConnection(final Socket socket,
-      final ServerConnectionFactory serverConnectionFactory) {
+                                          final ServerConnectionFactory serverConnectionFactory) {
     try {
       this.stats.incAcceptsInProgress();
       this.hsPool.execute(new Runnable() {
@@ -1410,7 +1403,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   }
 
   protected void handleNewClientConnection(final Socket socket,
-      final ServerConnectionFactory serverConnectionFactory) throws IOException {
+                                           final ServerConnectionFactory serverConnectionFactory)
+      throws IOException {
     // Read the first byte. If this socket is being used for 'client to server'
     // communication, create a ServerConnection. If this socket is being used
     // for 'server to client' communication, send it to the CacheClientNotifier
@@ -1432,14 +1426,23 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     }
 
     // GEODE-3637
-    // if (communicationMode.isSubscriptionFeed()) {
-    // boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
-    // logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
-    // primary ? "primary" : "secondary", socket);
-    // AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
-    // this.notifyBySubscription);
-    // return;
-    // }
+    if (communicationMode.isSubscriptionFeed()) {
+      boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
+      logger.info(
+          ":Bridge server: Initializing {} server-to-client communication socket: {} using selector={}",
+          primary ? "primary" : "secondary", socket, isSelector());
+      AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
+          this.notifyBySubscription);
+      return;
+    }
+//    if (communicationMode.isSubscriptionFeed()) {
+//      boolean
+//          isPrimaryServerToClient =
+//          communicationMode == CommunicationMode.PrimaryServerToClient;
+//      clientQueueInitPool
+//          .execute(new ClientQueueInitializerTask(socket, isPrimaryServerToClient, this));
+//      return;
+//    }
 
     logger.debug("Bridge server: Initializing {} communication socket: {}", communicationMode,
         socket);
@@ -1449,7 +1452,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       if (curCnt >= this.maxConnections) {
         logger.warn(LocalizedMessage.create(
             LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_CURRENT_CONNECTION_COUNT_OF_1_IS_GREATER_THAN_OR_EQUAL_TO_THE_CONFIGURED_MAX_OF_2,
-            new Object[] {socket.getInetAddress(), Integer.valueOf(curCnt),
+            new Object[]{socket.getInetAddress(), Integer.valueOf(curCnt),
                 Integer.valueOf(this.maxConnections)}));
         if (communicationMode.expectsConnectionRefusalMessage()) {
           try {
@@ -1489,7 +1492,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
         }
         logger.warn(LocalizedMessage.create(
             LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_REQUEST_REJECTED_BY_POOL,
-            new Object[] {serverConn}));
+            new Object[]{serverConn}));
         try {
           ServerHandShakeProcessor.refuse(socket.getOutputStream(),
               LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0
@@ -1664,10 +1667,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * @param bindName the ip address or host name that this acceptor should bind to. If null or ""
-   *        then calculate it.
+   * then calculate it.
    * @return the ip address or host name this acceptor will listen on. An "" if all local addresses
-   *         will be listened to.
-   *
+   * will be listened to.
    * @since GemFire 5.7
    */
   private static String calcBindHostName(Cache cache, String bindName) {
@@ -1704,7 +1706,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * Gets the address that this bridge server can be contacted on from external processes.
-   *
    * @since GemFire 5.7
    */
   public String getExternalAddress() {
@@ -1738,7 +1739,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * This method finds a client notifier and returns it. It is used to propagate interest
    * registrations to other servers
-   *
    * @return the instance that provides client notification
    */
   @Override
@@ -1796,7 +1796,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    * This method returns a thread safe structure which can be iterated over without worrying about
    * ConcurrentModificationException. JMX MBeans/Commands need to iterate over this list to get
    * client info.
-   *
    */
   public ServerConnection[] getAllServerConnectionList() {
     return this.allSCList;
@@ -1820,4 +1819,43 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
     releaseCommBuffer(Message.setTLCommBuffer(null));
   }
+
+  private class ClientQueueInitializerTask implements Runnable {
+    private final Socket socket;
+    private final boolean isPrimaryServerToClient;
+    private final AcceptorImpl acceptor;
+
+    public ClientQueueInitializerTask(Socket socket, boolean isPrimaryServerToClient,
+                                      AcceptorImpl acceptor) {
+      this.socket = socket;
+      this.acceptor = acceptor;
+      this.isPrimaryServerToClient = isPrimaryServerToClient;
+    }
+
+    @Override
+    public void run() {
+      logger.info(":Bridge server: Initializing {} server-to-client communication socket: {}",
+          isPrimaryServerToClient ? "primary" : "secondary", socket);
+      try {
+        acceptor.getCacheClientNotifier()
+            .registerClient(socket, isPrimaryServerToClient, acceptor.getAcceptorId(),
+                acceptor.isNotifyBySubscription());
+      } catch (IOException ex) {
+        closeSocket(socket);
+        if (isRunning()) {
+          if (!acceptor.loggedAcceptError) {
+            acceptor.loggedAcceptError = true;
+            if (ex instanceof SocketTimeoutException) {
+              logger.warn(LocalizedMessage.create(
+                  LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION_DUE_TO_SOCKET_TIMEOUT));
+            } else {
+              logger.warn(LocalizedMessage.create(
+                  LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION__0,
+                  ex), ex);
+            }
+          }
+        }
+      }
+    }
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 5c69769..1cac128 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -1191,13 +1191,13 @@ public abstract class ServerConnection implements Runnable {
   }
 
   private void initializeClientNofication() throws IOException {
-    if (communicationMode.isSubscriptionFeed()) {
-      boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
-      logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
-          primary ? "primary" : "secondary", theSocket);
-      getAcceptor().getCacheClientNotifier().registerClient(theSocket, primary,
-          getAcceptor().getAcceptorId(), getAcceptor().isNotifyBySubscription());
-    }
+//    if (communicationMode.isSubscriptionFeed()) {
+//      boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
+//      logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
+//          primary ? "primary" : "secondary", theSocket);
+//      getAcceptor().getCacheClientNotifier().registerClient(theSocket, primary,
+//          getAcceptor().getAcceptorId(), getAcceptor().isNotifyBySubscription());
+//    }
   }
 
   /**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java
new file mode 100644
index 0000000..9da6ca9
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.server.ClientSubscriptionConfig;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.internal.cache.DiskStoreAttributes;
+import org.apache.geode.internal.cache.InitialImageOperation;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.dunit.rules.SharedCountersRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category(DistributedTest.class)
+public class AcceptorImplDUnitTest implements Serializable {
+  private final Host host = Host.getHost(0);
+  private final static int numberOfEntries = 1000;
+  private final static AtomicInteger eventCount = new AtomicInteger(0);
+
+  @ClassRule
+  public static DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  @Rule
+  public CacheRule cacheRule = CacheRule.builder().createCacheIn(host.getVM(0)).createCacheIn(
+      host.getVM(1)).build();
+
+  @Rule
+  public SerializableTestName name = new SerializableTestName();
+
+  @Rule
+  public SerializableTemporaryFolder tempDir = new SerializableTemporaryFolder();
+
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties = new DistributedRestoreSystemProperties();
+
+  @Rule
+  public SharedCountersRule sharedCountersRule = SharedCountersRule.builder().build();
+
+  @Before
+  public void setup() {
+    Host host = this.host;
+    sharedCountersRule.initialize("startNow");
+    host.getAllVMs().forEach((vm) ->
+        vm.invoke(() -> {
+          System.setProperty("BridgeServer.HANDSHAKE_POOL_SIZE", "1");
+        })
+    );
+  }
+
+  @After
+  public void tearDown() {
+
+    host.getAllVMs().forEach((vm) ->
+        vm.invoke(() -> {
+          InitialImageOperation.slowImageProcessing = 0;
+        })
+    );
+  }
+
+  @Test
+  public void testClientSubscriptionQueueBlockingConnectionInitialization() throws Exception {
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+    int vm0_port = vm0.invoke("Start server with subscription turned on", () -> {
+      try {
+        return createSubscriptionServer(cacheRule.getCache());
+      } catch (IOException e) {
+        return 0;
+      }
+    });
+
+    vm2.invoke("Start Client1 with durable interest registration turned on", () -> {
+      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+      clientCacheFactory.setPoolSubscriptionEnabled(true);
+      clientCacheFactory.setPoolSubscriptionRedundancy(1);
+      clientCacheFactory.setPoolReadTimeout(200);
+      clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+      ClientCache
+          cache =
+          clientCacheFactory.set("durable-client-id", "1").set("durable-client-timeout", "300")
+              .set("mcast-port", "0").create();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region region = clientRegionFactory.create("subscriptionRegion");
+
+      region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+      cache.readyForEvents();
+      cache.close(true);
+    });
+    vm3.invoke("Start Client2 to add entries to region", () -> {
+      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+      clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+      ClientCache cache = clientCacheFactory.set("mcast-port", "0").create();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region region = clientRegionFactory.create("subscriptionRegion");
+
+      for (int i = 0; i < numberOfEntries; i++) {
+        region.put(i, i);
+      }
+      cache.close();
+    });
+
+    int vm1_port = vm1.invoke("Start server2 in with subscriptions turned on", () -> {
+      try {
+        int serverPort = createSubscriptionServer(cacheRule.getCache());
+        InitialImageOperation.slowImageProcessing = 30;
+        return serverPort;
+      } catch (IOException e) {
+        return 0;
+      }
+    });
+
+    vm0.invoke("Turn on slow image processsing", () -> {
+      InitialImageOperation.slowImageProcessing = 30;
+    });
+
+    vm2.invokeAsync("Start Client1, expecting durable messages to be delivered", () -> {
+
+      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+      clientCacheFactory.setPoolSubscriptionEnabled(true);
+      clientCacheFactory.setPoolSubscriptionRedundancy(1);
+      clientCacheFactory.setPoolMinConnections(1);
+      clientCacheFactory.setPoolMaxConnections(1);
+      clientCacheFactory.setPoolReadTimeout(200);
+      clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+      clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
+      sharedCountersRule.increment("startNow");
+      ClientCache
+          cache =
+          clientCacheFactory.set("durable-client-id", "1").set("durable-client-timeout", "300")
+              .set("mcast-port", "0").create();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region region = clientRegionFactory
+          .addCacheListener(new CacheListenerAdapter() {
+            @Override
+            public void afterCreate(EntryEvent event) {
+              eventCount.incrementAndGet();
+            }
+
+            @Override
+            public void afterUpdate(EntryEvent event) {
+              eventCount.incrementAndGet();
+            }
+          })
+          .create("subscriptionRegion");
+
+      region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+      cache.readyForEvents();
+    });
+
+    AsyncInvocation<Boolean>
+        completed =
+        vm3.invokeAsync("Start Client2 to add entries to region", () -> {
+          while (true) {
+            Thread.sleep(100);
+            if (sharedCountersRule.getTotal("startNow") == 1) {
+              break;
+            }
+          }
+          ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+          clientCacheFactory.setPoolRetryAttempts(0);
+          clientCacheFactory.setPoolMinConnections(1);
+          clientCacheFactory.setPoolMaxConnections(1);
+          clientCacheFactory.setPoolReadTimeout(200);
+          clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+          clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
+          ClientCache cache = clientCacheFactory.set("mcast-port", "0").create();
+          ClientRegionFactory<Object, Object> clientRegionFactory =
+              cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+          Region region = clientRegionFactory.create("subscriptionRegion");
+
+          for (int i = 0; i < 100; i++) {
+            System.err.println("i = " + region.get(i));
+          }
+          cache.close();
+          return true;
+        });
+    Awaitility.await().atMost(40, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
+        .until(() -> vm2.invoke(() -> {
+          return eventCount.get();
+        }) == numberOfEntries);
+    assertTrue(completed.get());
+  }
+
+  private int createSubscriptionServer(InternalCache cache) throws IOException {
+    initializeDiskStore(cache);
+    initializeReplicateRegion(cache);
+    return initializeCacheServerWithSubscription(host, cache);
+  }
+
+  private void initializeDiskStore(InternalCache cache) throws IOException {
+    DiskStoreAttributes diskStoreAttributes = new DiskStoreAttributes();
+    diskStoreAttributes.name = "clientQueueDS";
+    diskStoreAttributes.diskDirs = new File[]{tempDir.newFolder(name + "_dir")};
+    cache.createDiskStoreFactory(diskStoreAttributes).create("clientQueueDS");
+  }
+
+  private void initializeReplicateRegion(InternalCache cache) {
+    cache.createRegionFactory(RegionShortcut.REPLICATE).setStatisticsEnabled(true)
+        .setSubscriptionAttributes(new SubscriptionAttributes(
+            InterestPolicy.ALL)).create("subscriptionRegion");
+  }
+
+  private int initializeCacheServerWithSubscription(Host host, InternalCache cache)
+      throws IOException {
+    CacheServer cacheServer1 = cache.addCacheServer(false);
+    ClientSubscriptionConfig clientSubscriptionConfig =
+        cacheServer1.getClientSubscriptionConfig();
+    clientSubscriptionConfig.setEvictionPolicy("entry");
+    clientSubscriptionConfig.setCapacity(5);
+    clientSubscriptionConfig.setDiskStoreName("clientQueueDS");
+    cacheServer1.setPort(0);
+    cacheServer1.setHostnameForClients(host.getHostName());
+    cacheServer1.start();
+    return cacheServer1.getPort();
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/SSLConfigJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/SSLConfigJUnitTest.java
index 357736b..8904380 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/SSLConfigJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/SSLConfigJUnitTest.java
@@ -21,6 +21,7 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -44,7 +45,8 @@ public class SSLConfigJUnitTest {
   private static final Properties GATEWAY_SSL_PROPS_MAP = new Properties();
   private static final Properties GATEWAY_PROPS_SUBSET_MAP = new Properties();
 
-  static {
+  @BeforeClass
+  public static void initializeSSLMaps() {
 
     SSL_PROPS_MAP.put("javax.net.ssl.keyStoreType", "jks");
     SSL_PROPS_MAP.put("javax.net.ssl.keyStore", "/export/gemfire-configs/gemfire.keystore");
@@ -383,7 +385,6 @@ public class SSLConfigJUnitTest {
     gemFireProps.put(CLUSTER_SSL_CIPHERS, sslciphers);
     gemFireProps.put(CLUSTER_SSL_REQUIRE_AUTHENTICATION, String.valueOf(requireAuth));
 
-
     gemFireProps.put(SERVER_SSL_ENABLED, String.valueOf(cacheServerSslenabled));
     gemFireProps.put(SERVER_SSL_PROTOCOLS, cacheServerSslprotocols);
     gemFireProps.put(SERVER_SSL_CIPHERS, cacheServerSslciphers);

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.