You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/11/07 01:37:52 UTC

[geode] branch feature/GEODE-3637 updated (849d2fd -> 0187e8c)

This is an automated email from the ASF dual-hosted git repository.

udo pushed a change to branch feature/GEODE-3637
in repository https://gitbox.apache.org/repos/asf/geode.git.


 discard 849d2fd  GEODE-3637: Revert changes to client queue initialization
     add 01dc11f  GEODE-3953: Incorrect use of .equals() for comparison of fieldname arrays GEODE-3954: Misleading Exception message with mismatched fieldAnalyzers
     add f53bdf0  GEODE-3637: Revert changes to client queue initialization
     new 0187e8c  GEODE-3637: Reimplement client queue initialization. Adding shutdown logic

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (849d2fd)
            \
             N -- N -- N   refs/heads/feature/GEODE-3637 (0187e8c)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../internal/cache/tier/sockets/AcceptorImpl.java  | 314 ++++++++++++---------
 .../cache/tier/sockets/ServerConnection.java       |  51 ++--
 .../sockets/AcceptorImplClientQueueDUnitTest.java  | 263 +++++++++++++++++
 .../apache/geode/test/dunit/rules/CacheRule.java   |  22 +-
 .../internal/LuceneIndexCreationProfile.java       |   9 +-
 .../LuceneIndexCreationProfileJUnitTest.java       |   7 +
 6 files changed, 505 insertions(+), 161 deletions(-)
 create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].

[geode] 01/01: GEODE-3637: Reimplement client queue initialization. Adding shutdown logic

Posted by ud...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch feature/GEODE-3637
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 0187e8cd4fb50711cec77ad0e5459c8cd9a64a7a
Author: kohlmu-pivotal <uk...@pivotal.io>
AuthorDate: Mon Nov 6 17:37:21 2017 -0800

    GEODE-3637: Reimplement client queue initialization. Adding shutdown logic
---
 .../internal/cache/tier/sockets/AcceptorImpl.java  | 314 ++++++++++++---------
 .../cache/tier/sockets/ServerConnection.java       |  51 ++--
 .../sockets/AcceptorImplClientQueueDUnitTest.java  | 263 +++++++++++++++++
 .../apache/geode/test/dunit/rules/CacheRule.java   |  22 +-
 4 files changed, 493 insertions(+), 157 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 5b289a9..abc23e7 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -103,6 +103,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   private static final Logger logger = LogService.getLogger();
 
   private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit");
+  private static final int HANDSHAKER_DEFAULT_POOL_SIZE = 4;
 
   protected final CacheServerStats stats;
   private final int maxConnections;
@@ -115,6 +116,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   private final ThreadPoolExecutor hsPool;
 
   /**
+   * A pool used to process client-queue-initializations.
+   */
+  private final ThreadPoolExecutor clientQueueInitPool;
+
+  /**
    * The port on which this acceptor listens for client connections
    */
   private final int localPort;
@@ -534,103 +540,126 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings,
         this.clientNotifier.getStats());
 
-    {
-      ThreadPoolExecutor tmp_pool = null;
-      String gName = "ServerConnection "
-          // + serverSock.getInetAddress()
-          + "on port " + this.localPort;
-      final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
-
-      ThreadFactory socketThreadFactory = new ThreadFactory() {
-        int connNum = -1;
-
-        public Thread newThread(final Runnable command) {
-          int tnum;
-          synchronized (this) {
-            tnum = ++connNum;
+    pool = initializeServerConnectionThreadPool();
+    hsPool = initializeHandshakerThreadPool();
+    clientQueueInitPool = initializeClientQueueInitializerThreadPool();
+
+    isAuthenticationRequired = this.securityService.isClientSecurityRequired();
+
+    isIntegratedSecurity = this.securityService.isIntegratedSecurity();
+
+    String postAuthzFactoryName =
+        this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
+
+    isPostAuthzCallbackPresent =
+        (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false;
+  }
+
+  private ThreadPoolExecutor initializeHandshakerThreadPool() throws IOException {
+    String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort;
+    final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+
+    ThreadFactory socketThreadFactory = new ThreadFactory() {
+      AtomicInteger connNum = new AtomicInteger(-1);
+
+      @Override
+      public Thread newThread(Runnable command) {
+        String threadName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
+        getStats().incAcceptThreadsCreated();
+        return new Thread(socketThreadGroup, command, threadName);
+      }
+    };
+    try {
+      final BlockingQueue blockingQueue = new SynchronousQueue();
+      final RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
+          try {
+            blockingQueue.put(r);
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt(); // preserve the state
+            throw new RejectedExecutionException(
+                LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex);
           }
-          String tName = socketThreadGroup.getName() + " Thread " + tnum;
-          getStats().incConnectionThreadsCreated();
-          Runnable r = new Runnable() {
-            public void run() {
-              try {
-                command.run();
-              } catch (CancelException e) { // bug 39463
-                // ignore
-              } finally {
-                ConnectionTable.releaseThreadsSockets();
-              }
-            }
-          };
-          return new Thread(socketThreadGroup, r, tName);
         }
       };
-      try {
-        if (isSelector()) {
-          tmp_pool = new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads,
-              getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE);
-        } else {
-          tmp_pool = new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L,
-              TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory);
-        }
-      } catch (IllegalArgumentException poolInitException) {
-        this.stats.close();
-        this.serverSock.close();
-        throw poolInitException;
-      }
-      this.pool = tmp_pool;
+      logger.warn("Handshaker max Pool size: " + HANDSHAKE_POOL_SIZE);
+      return new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, blockingQueue,
+          socketThreadFactory, rejectedExecutionHandler);
+    } catch (IllegalArgumentException poolInitException) {
+      this.stats.close();
+      this.serverSock.close();
+      this.pool.shutdownNow();
+      throw poolInitException;
     }
-    {
-      ThreadPoolExecutor tmp_hsPool = null;
-      String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort;
-      final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+  }
 
-      ThreadFactory socketThreadFactory = new ThreadFactory() {
-        int connNum = -1;
+  private ThreadPoolExecutor initializeClientQueueInitializerThreadPool() throws IOException {
+    final ThreadGroup clientQueueThreadGroup =
+        LoggingThreadGroup.createThreadGroup("Client Queue Initialization ", logger);
 
-        public Thread newThread(Runnable command) {
-          int tnum;
-          synchronized (this) {
-            tnum = ++connNum;
+    ThreadFactory clientQueueThreadFactory = new ThreadFactory() {
+      AtomicInteger connNum = new AtomicInteger(-1);
+
+      @Override
+      public Thread newThread(final Runnable command) {
+        String threadName =
+            clientQueueThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
+        Runnable runnable = new Runnable() {
+          public void run() {
+            try {
+              command.run();
+            } catch (CancelException e) {
+              logger.debug("Client Queue Initialization was canceled.", e);
+            }
           }
-          String tName = socketThreadGroup.getName() + " Thread " + tnum;
-          getStats().incAcceptThreadsCreated();
-          return new Thread(socketThreadGroup, command, tName);
-        }
-      };
-      try {
-        final BlockingQueue bq = new SynchronousQueue();
-        final RejectedExecutionHandler reh = new RejectedExecutionHandler() {
-          public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
+        };
+        return new Thread(clientQueueThreadGroup, runnable, threadName);
+      }
+    };
+    return new PooledExecutorWithDMStats(new SynchronousQueue(), 16, getStats().getCnxPoolHelper(),
+        clientQueueThreadFactory, 60000);
+  }
+
+  private ThreadPoolExecutor initializeServerConnectionThreadPool() throws IOException {
+    String gName = "ServerConnection "
+        // + serverSock.getInetAddress()
+        + "on port " + this.localPort;
+    final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+
+    ThreadFactory socketThreadFactory = new ThreadFactory() {
+      AtomicInteger connNum = new AtomicInteger(-1);
+
+      @Override
+      public Thread newThread(final Runnable command) {
+        String tName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
+        getStats().incConnectionThreadsCreated();
+        Runnable r = new Runnable() {
+          public void run() {
             try {
-              bq.put(r);
-            } catch (InterruptedException ex) {
-              Thread.currentThread().interrupt(); // preserve the state
-              throw new RejectedExecutionException(
-                  LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex);
+              command.run();
+            } catch (CancelException e) { // bug 39463
+              // ignore
+            } finally {
+              ConnectionTable.releaseThreadsSockets();
             }
           }
         };
-        tmp_hsPool = new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, bq,
-            socketThreadFactory, reh);
-      } catch (IllegalArgumentException poolInitException) {
-        this.stats.close();
-        this.serverSock.close();
-        this.pool.shutdownNow();
-        throw poolInitException;
+        return new Thread(socketThreadGroup, r, tName);
       }
-      this.hsPool = tmp_hsPool;
+    };
+    try {
+      if (isSelector()) {
+        return new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads,
+            getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE);
+      } else {
+        return new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L,
+            TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory);
+      }
+    } catch (IllegalArgumentException poolInitException) {
+      this.stats.close();
+      this.serverSock.close();
+      throw poolInitException;
     }
-
-    isAuthenticationRequired = this.securityService.isClientSecurityRequired();
-
-    isIntegratedSecurity = this.securityService.isIntegratedSecurity();
-
-    String postAuthzFactoryName =
-        this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
-
-    isPostAuthzCallbackPresent =
-        (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false;
   }
 
   public long getAcceptorId() {
@@ -666,8 +695,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   @Deprecated
   private static final int DEPRECATED_SELECTOR_POOL_SIZE =
       Integer.getInteger("BridgeServer.SELECTOR_POOL_SIZE", 16).intValue();
-  private static final int HANDSHAKE_POOL_SIZE =
-      Integer.getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", 4).intValue();
+  private static final int HANDSHAKE_POOL_SIZE = Integer
+      .getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", HANDSHAKER_DEFAULT_POOL_SIZE).intValue();
 
   @Override
   public void start() throws IOException {
@@ -802,8 +831,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    * @see SystemFailure#loadEmergencyClasses()
    */
   public static void loadEmergencyClasses() {
-    if (emergencyClassesLoaded)
+    if (emergencyClassesLoaded) {
       return;
+    }
     emergencyClassesLoaded = true;
     CachedRegionHelper.loadEmergencyClasses();
     ServerConnection.loadEmergencyClasses();
@@ -870,8 +900,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   private Selector tmpSel;
 
   private void checkForStuckKeys() {
-    if (!WORKAROUND_SELECTOR_BUG)
+    if (!WORKAROUND_SELECTOR_BUG) {
       return;
+    }
     if (tmpSel == null) {
       try {
         tmpSel = Selector.open();
@@ -887,8 +918,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     while (it.hasNext()) {
       SelectionKey sk = (SelectionKey) it.next();
       ServerConnection sc = (ServerConnection) sk.attachment();
-      if (sc == null)
+      if (sc == null) {
         continue;
+      }
       try {
         sk.cancel();
         this.selector.selectNow(); // clear the cancelled key
@@ -1040,40 +1072,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
           break;
         }
         if (events == 0) {
-          // zeroEventsCount++;
-          // if (zeroEventsCount > 0) {
-          // zeroEventsCount = 0;
           checkForStuckKeys();
-
-          // try {
-          // this.selector.close(); // this selector is sick!
-          // } catch (IOException ignore) {
-          // }
-          // this.selector = Selector.open();
-          // {
-          // Iterator it = selectorRegistrations.iterator();
-          // while (it.hasNext()) {
-          // ServerConnection sc = (ServerConnection)it.next();
-          // sc.registerWithSelector2(this.selector);
-          // }
-          // }
-          // }
-          // ArrayList al = new ArrayList();
-          // Iterator keysIt = this.selector.keys().iterator();
-          // while (keysIt.hasNext()) {
-          // SelectionKey sk = (SelectionKey)keysIt.next();
-          // al.add(sk.attachment());
-          // sk.cancel();
-          // }
-          // events = this.selector.selectNow();
-          // Iterator alIt = al.iterator();
-          // while (alIt.hasNext()) {
-          // ServerConnection sc = (ServerConnection)alIt.next();
-          // sc.registerWithSelector2(this.selector);
-          // }
-          // events = this.selector.select();
-          // } else {
-          // zeroEventsCount = 0;
         }
         while (events > 0) {
           int cancelCount = 0;
@@ -1130,16 +1129,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
                   logger.warn(
                       LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED, rejected));
                 }
-                // } else if (key.isValid() && key.isConnectable()) {
-                // logger.info("DEBUG isConnectable and isValid key=" + key);
-                // finishCon(sc);
               } else {
                 finishCon(sc);
                 if (key.isValid()) {
                   logger.warn(LocalizedMessage.create(
                       LocalizedStrings.AcceptorImpl_IGNORING_EVENT_ON_SELECTOR_KEY__0, key));
-                  // } else {
-                  // logger.info("DEBUG !isValid key=" + key);
                 }
               }
             } catch (CancelledKeyException ex) { // fix for bug 37739
@@ -1405,6 +1399,10 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     return this.clientServerCnxCount.get();
   }
 
+  public boolean isNotifyBySubscription() {
+    return notifyBySubscription;
+  }
+
   protected void handleNewClientConnection(final Socket socket,
       final ServerConnectionFactory serverConnectionFactory) throws IOException {
     // Read the first byte. If this socket is being used for 'client to server'
@@ -1427,12 +1425,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       throw new EOFException();
     }
 
-    if (communicationMode.isSubscriptionFeed()) {
-      boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
-      logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
-          primary ? "primary" : "secondary", socket);
-      AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
-          this.notifyBySubscription);
+    // GEODE-3637 - If the communicationMode is client Subscriptions, hand-off the client queue
+    // initialization to be done in another threadPool
+    if (initializeClientPools(socket, communicationMode)) {
       return;
     }
 
@@ -1498,6 +1493,17 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     }
   }
 
+  private boolean initializeClientPools(Socket socket, CommunicationMode communicationMode) {
+    if (communicationMode.isSubscriptionFeed()) {
+      boolean isPrimaryServerToClient =
+          communicationMode == CommunicationMode.PrimaryServerToClient;
+      clientQueueInitPool
+          .execute(new ClientQueueInitializerTask(socket, isPrimaryServerToClient, this));
+      return true;
+    }
+    return false;
+  }
+
   private CommunicationMode getCommunicationModeForNonSelector(Socket socket) throws IOException {
     socket.setSoTimeout(this.acceptTimeout);
     this.socketCreator.configureServerSSLSocket(socket);
@@ -1637,6 +1643,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       Thread.currentThread().interrupt();
       this.pool.shutdownNow();
     }
+    this.clientQueueInitPool.shutdownNow();
     this.hsPool.shutdownNow();
   }
 
@@ -1654,6 +1661,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     return !isRunning() && !thread.isAlive()
         && (selectorThread == null || !selectorThread.isAlive())
         && (pool == null || pool.isShutdown()) && (hsPool == null || hsPool.isShutdown())
+        && (clientQueueInitPool == null || clientQueueInitPool.isShutdown())
         && (selector == null || !selector.isOpen()) && (tmpSel == null || !tmpSel.isOpen());
   }
 
@@ -1662,7 +1670,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    *        then calculate it.
    * @return the ip address or host name this acceptor will listen on. An "" if all local addresses
    *         will be listened to.
-   *
    * @since GemFire 5.7
    */
   private static String calcBindHostName(Cache cache, String bindName) {
@@ -1791,7 +1798,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    * This method returns a thread safe structure which can be iterated over without worrying about
    * ConcurrentModificationException. JMX MBeans/Commands need to iterate over this list to get
    * client info.
-   *
    */
   public ServerConnection[] getAllServerConnectionList() {
     return this.allSCList;
@@ -1815,4 +1821,42 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
     releaseCommBuffer(Message.setTLCommBuffer(null));
   }
+
+  private class ClientQueueInitializerTask implements Runnable {
+    private final Socket socket;
+    private final boolean isPrimaryServerToClient;
+    private final AcceptorImpl acceptor;
+
+    public ClientQueueInitializerTask(Socket socket, boolean isPrimaryServerToClient,
+        AcceptorImpl acceptor) {
+      this.socket = socket;
+      this.acceptor = acceptor;
+      this.isPrimaryServerToClient = isPrimaryServerToClient;
+    }
+
+    @Override
+    public void run() {
+      logger.info(":Bridge server: Initializing {} server-to-client communication socket: {}",
+          isPrimaryServerToClient ? "primary" : "secondary", socket);
+      try {
+        acceptor.getCacheClientNotifier().registerClient(socket, isPrimaryServerToClient,
+            acceptor.getAcceptorId(), acceptor.isNotifyBySubscription());
+      } catch (IOException ex) {
+        closeSocket(socket);
+        if (isRunning()) {
+          if (!acceptor.loggedAcceptError) {
+            acceptor.loggedAcceptError = true;
+            if (ex instanceof SocketTimeoutException) {
+              logger.warn(LocalizedMessage.create(
+                  LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION_DUE_TO_SOCKET_TIMEOUT));
+            } else {
+              logger.warn(LocalizedMessage.create(
+                  LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION__0,
+                  ex), ex);
+            }
+          }
+        }
+      }
+    }
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 0e510af..74451e5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -189,7 +189,9 @@ public abstract class ServerConnection implements Runnable {
    */
   private volatile int requestSpecificTimeout = -1;
 
-  /** Tracks the id of the most recent batch to which a reply has been sent */
+  /**
+   * Tracks the id of the most recent batch to which a reply has been sent
+   */
   private int latestBatchIdReplied = -1;
 
   /*
@@ -711,8 +713,9 @@ public abstract class ServerConnection implements Runnable {
       // can be used.
       initializeCommands();
       // its initialized in verifyClientConnection call
-      if (!getCommunicationMode().isWAN())
+      if (!getCommunicationMode().isWAN()) {
         initializeClientUserAuths();
+      }
     }
     if (TEST_VERSION_AFTER_HANDSHAKE_FLAG) {
       Assert.assertTrue((this.handshake.getVersion().ordinal() == testVersionAfterHandshake),
@@ -892,7 +895,9 @@ public abstract class ServerConnection implements Runnable {
         }
       }
       if (unregisterClient)// last serverconnection call all close on auth objects
+      {
         cleanClientAuths();
+      }
       this.clientUserAuths = null;
       if (needsUnregister) {
         this.acceptor.getClientHealthMonitor().removeConnection(this.proxyId, this);
@@ -917,8 +922,9 @@ public abstract class ServerConnection implements Runnable {
     ClientUserAuths cua = new ClientUserAuths(proxyId.hashCode());
     ClientUserAuths retCua = proxyIdVsClientUserAuths.putIfAbsent(proxyId, cua);
 
-    if (retCua == null)
+    if (retCua == null) {
       return cua;
+    }
     return retCua;
   }
 
@@ -954,8 +960,9 @@ public abstract class ServerConnection implements Runnable {
         boolean removed = this.clientUserAuths.removeSubject(aIds.getUniqueId());
 
         // if not successfull, try the old way
-        if (!removed)
+        if (!removed) {
           removed = this.clientUserAuths.removeUserId(aIds.getUniqueId(), keepalive);
+        }
         return removed;
 
       } catch (NullPointerException npe) {
@@ -1010,7 +1017,6 @@ public abstract class ServerConnection implements Runnable {
         throw new AuthenticationFailedException("Authentication failed");
       }
 
-
       byte[] credBytes = msg.getPart(0).getSerializedForm();
 
       credBytes = ((HandShake) this.handshake).decryptBytes(credBytes);
@@ -1124,6 +1130,7 @@ public abstract class ServerConnection implements Runnable {
 
   public void run() {
     setOwner();
+
     if (getAcceptor().isSelector()) {
       boolean finishedMsg = false;
       try {
@@ -1136,9 +1143,7 @@ public abstract class ServerConnection implements Runnable {
             finishedMsg = true;
           }
         }
-      } catch (java.nio.channels.ClosedChannelException ignore) {
-        // ok shutting down
-      } catch (CancelException e) {
+      } catch (java.nio.channels.ClosedChannelException | CancelException ignore) {
         // ok shutting down
       } catch (IOException ex) {
         logger.warn(
@@ -1187,6 +1192,7 @@ public abstract class ServerConnection implements Runnable {
    * If registered with a selector then this will be the key we are registered with.
    */
   // private SelectionKey sKey = null;
+
   /**
    * Register this connection with the given selector for read events. Note that switch the channel
    * to non-blocking so it can be in a selector.
@@ -1202,7 +1208,8 @@ public abstract class ServerConnection implements Runnable {
   }
 
   public void registerWithSelector2(Selector s) throws IOException {
-    /* this.sKey = */getSelectableChannel().register(s, SelectionKey.OP_READ, this);
+    /* this.sKey = */
+    getSelectableChannel().register(s, SelectionKey.OP_READ, this);
   }
 
   /**
@@ -1225,7 +1232,6 @@ public abstract class ServerConnection implements Runnable {
   }
 
   /**
-   *
    * @return String representing the DistributedSystemMembership of the Client VM
    */
   public String getMembershipID() {
@@ -1265,10 +1271,11 @@ public abstract class ServerConnection implements Runnable {
   }
 
   protected int getClientReadTimeout() {
-    if (this.requestSpecificTimeout == -1)
+    if (this.requestSpecificTimeout == -1) {
       return this.handshake.getClientReadTimeout();
-    else
+    } else {
       return this.requestSpecificTimeout;
+    }
   }
 
   protected boolean isProcessingMessage() {
@@ -1519,7 +1526,9 @@ public abstract class ServerConnection implements Runnable {
     return this.name;
   }
 
-  /** returns the name of this connection */
+  /**
+   * returns the name of this connection
+   */
   public String getName() {
     return this.name;
   }
@@ -1736,11 +1745,13 @@ public abstract class ServerConnection implements Runnable {
     // for backward client it will be store in member variable userAuthId
     // for other look "requestMsg" here and get unique-id from this to get the authzrequest
 
-    if (!AcceptorImpl.isAuthenticationRequired())
+    if (!AcceptorImpl.isAuthenticationRequired()) {
       return null;
+    }
 
-    if (AcceptorImpl.isIntegratedSecurity())
+    if (AcceptorImpl.isIntegratedSecurity()) {
       return null;
+    }
 
     long uniqueId = getUniqueId();
 
@@ -1768,11 +1779,13 @@ public abstract class ServerConnection implements Runnable {
 
   public AuthorizeRequestPP getPostAuthzRequest()
       throws AuthenticationRequiredException, IOException {
-    if (!AcceptorImpl.isAuthenticationRequired())
+    if (!AcceptorImpl.isAuthenticationRequired()) {
       return null;
+    }
 
-    if (AcceptorImpl.isIntegratedSecurity())
+    if (AcceptorImpl.isIntegratedSecurity()) {
       return null;
+    }
 
     // look client version and return authzrequest
     // for backward client it will be store in member variable userAuthId
@@ -1799,7 +1812,9 @@ public abstract class ServerConnection implements Runnable {
     return postAuthReq;
   }
 
-  /** returns the member ID byte array to be used for creating EventID objects */
+  /**
+   * returns the member ID byte array to be used for creating EventID objects
+   */
   public byte[] getEventMemberIDByteArray() {
     return this.memberIdByteArray;
   }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
new file mode 100644
index 0000000..c0b2d07
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.rmi.RemoteException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.server.ClientSubscriptionConfig;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.DistributedLockBlackboard;
+import org.apache.geode.distributed.DistributedLockBlackboardImpl;
+import org.apache.geode.internal.cache.DiskStoreAttributes;
+import org.apache.geode.internal.cache.InitialImageOperation;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.dunit.rules.SharedCountersRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category(DistributedTest.class)
+public class AcceptorImplClientQueueDUnitTest implements Serializable {
+  private final Host host = Host.getHost(0);
+  private static final int numberOfEntries = 200;
+  private static final AtomicInteger eventCount = new AtomicInteger(0);
+  private static final AtomicBoolean completedClient2 = new AtomicBoolean(false);
+
+  @ClassRule
+  public static DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  @Rule
+  public CacheRule cacheRule =
+      CacheRule.builder().createCacheIn(host.getVM(0)).createCacheIn(host.getVM(1))
+          .addSystemProperty("BridgeServer.HANDSHAKE_POOL_SIZE", "1").build();
+
+  @Rule
+  public SerializableTestName name = new SerializableTestName();
+
+  @Rule
+  public SerializableTemporaryFolder tempDir = new SerializableTemporaryFolder();
+
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
+
+  private DistributedLockBlackboard blackboard = null;
+
+  @Before
+  public void setup() throws Exception {
+    blackboard = DistributedLockBlackboardImpl.getInstance();
+  }
+
+  @After
+  public void tearDown() throws RemoteException {
+    blackboard.initCount();
+    host.getAllVMs().forEach((vm) -> vm.invoke(() -> {
+      InitialImageOperation.slowImageProcessing = 0;
+      System.getProperties().remove("BridgeServer.HANDSHAKE_POOL_SIZE");
+    }));
+  }
+
+  @Test
+  public void testClientSubscriptionQueueBlockingConnectionInitialization() throws Exception {
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+    int vm0_port = vm0.invoke("Start server with subscription turned on", () -> {
+      try {
+        return createSubscriptionServer(cacheRule.getCache());
+      } catch (IOException e) {
+        return 0;
+      }
+    });
+
+    vm2.invoke("Start Client1 with durable interest registration turned on", () -> {
+      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+      clientCacheFactory.setPoolSubscriptionEnabled(true);
+      clientCacheFactory.setPoolSubscriptionRedundancy(1);
+      clientCacheFactory.setPoolReadTimeout(200);
+      clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+      ClientCache cache = clientCacheFactory.set("durable-client-id", "1")
+          .set("durable-client-timeout", "300").set("mcast-port", "0").create();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region region = clientRegionFactory.create("subscriptionRegion");
+
+      region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+      cache.readyForEvents();
+      cache.close(true);
+    });
+    vm3.invoke("Start Client2 to add entries to region", () -> {
+      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+      clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+      ClientCache cache = clientCacheFactory.set("mcast-port", "0").create();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region region = clientRegionFactory.create("subscriptionRegion");
+
+      for (int i = 0; i < numberOfEntries; i++) {
+        region.put(i, i);
+      }
+      cache.close();
+    });
+
+    int vm1_port = vm1.invoke("Start server2 in with subscriptions turned on", () -> {
+      try {
+        int serverPort = createSubscriptionServer(cacheRule.getCache());
+        InitialImageOperation.slowImageProcessing = 30;
+        return serverPort;
+      } catch (IOException e) {
+        return 0;
+      }
+    });
+
+    vm0.invoke("Turn on slow image processsing", () -> {
+      InitialImageOperation.slowImageProcessing = 30;
+    });
+
+    AsyncInvocation<Boolean> completedClient1 =
+        vm2.invokeAsync("Start Client1, expecting durable messages to be delivered", () -> {
+
+          ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+          clientCacheFactory.setPoolSubscriptionEnabled(true);
+          clientCacheFactory.setPoolSubscriptionRedundancy(1);
+          clientCacheFactory.setPoolMinConnections(1);
+          clientCacheFactory.setPoolMaxConnections(1);
+          clientCacheFactory.setPoolReadTimeout(200);
+          clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
+          ClientCacheFactory cacheFactory = clientCacheFactory.set("durable-client-id", "1")
+              .set("durable-client-timeout", "300").set("mcast-port", "0");
+          blackboard.incCount();
+          ClientCache cache = cacheFactory.create();
+
+          ClientRegionFactory<Object, Object> clientRegionFactory =
+              cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+          Region region = clientRegionFactory.addCacheListener(new CacheListenerAdapter() {
+            @Override
+            public void afterCreate(EntryEvent event) {
+              eventCount.incrementAndGet();
+            }
+
+            @Override
+            public void afterUpdate(EntryEvent event) {
+              eventCount.incrementAndGet();
+            }
+          }).create("subscriptionRegion");
+
+          region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+          cache.readyForEvents();
+          Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
+              .until(() -> eventCount.get() == numberOfEntries);
+          cache.close();
+          return eventCount.get() == numberOfEntries;
+        });
+
+    vm3.invokeAsync("Start Client2 to add entries to region", () -> {
+      while (true) {
+        Thread.sleep(100);
+        if (blackboard.getCount() == 1) {
+          break;
+        }
+      }
+      ClientCache cache = null;
+      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+      clientCacheFactory.setPoolRetryAttempts(0);
+      clientCacheFactory.setPoolMinConnections(1);
+      clientCacheFactory.setPoolMaxConnections(1);
+      clientCacheFactory.setPoolReadTimeout(200);
+      clientCacheFactory.setPoolSocketConnectTimeout(500);
+      clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
+      cache = clientCacheFactory.set("mcast-port", "0").create();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region region = clientRegionFactory.create("subscriptionRegion");
+
+      int returnValue = 0;
+      for (int i = 0; i < 100; i++) {
+        returnValue = (int) region.get(i);
+      }
+      cache.close();
+      completedClient2.set(returnValue == 99);
+    });
+    assertTrue(completedClient1.get());
+    assertTrue(vm3.invoke(() -> completedClient2.get()));
+  }
+
+  private int createSubscriptionServer(InternalCache cache) throws IOException {
+    initializeDiskStore(cache);
+    initializeReplicateRegion(cache);
+    return initializeCacheServerWithSubscription(host, cache);
+  }
+
+  private void initializeDiskStore(InternalCache cache) throws IOException {
+    DiskStoreAttributes diskStoreAttributes = new DiskStoreAttributes();
+    diskStoreAttributes.name = "clientQueueDS";
+    diskStoreAttributes.diskDirs = new File[] {tempDir.newFolder(name + "_dir")};
+    cache.createDiskStoreFactory(diskStoreAttributes).create("clientQueueDS");
+  }
+
+  private void initializeReplicateRegion(InternalCache cache) {
+    cache.createRegionFactory(RegionShortcut.REPLICATE).setStatisticsEnabled(true)
+        .setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL))
+        .create("subscriptionRegion");
+  }
+
+  private int initializeCacheServerWithSubscription(Host host, InternalCache cache)
+      throws IOException {
+    CacheServer cacheServer1 = cache.addCacheServer(false);
+    ClientSubscriptionConfig clientSubscriptionConfig = cacheServer1.getClientSubscriptionConfig();
+    clientSubscriptionConfig.setEvictionPolicy("entry");
+    clientSubscriptionConfig.setCapacity(5);
+    clientSubscriptionConfig.setDiskStoreName("clientQueueDS");
+    cacheServer1.setPort(0);
+    cacheServer1.setHostnameForClients(host.getHostName());
+    cacheServer1.start();
+    return cacheServer1.getPort();
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
index dc42da8..b65bf86 100644
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
@@ -63,6 +63,7 @@ public class CacheRule extends DistributedExternalResource {
   private final boolean disconnectAfter;
   private final List<VM> createCacheInVMs;
   private final Properties config;
+  private final Properties systemProperties;
 
   public static Builder builder() {
     return new Builder();
@@ -74,18 +75,19 @@ public class CacheRule extends DistributedExternalResource {
     this.disconnectAfter = builder.disconnectAfter;
     this.createCacheInVMs = builder.createCacheInVMs;
     this.config = builder.config;
+    this.systemProperties = builder.systemProperties;
   }
 
   @Override
   protected void before() {
     if (createCacheInAll) {
-      invoker().invokeInEveryVMAndController(() -> createCache(config));
+      invoker().invokeInEveryVMAndController(() -> createCache(config, systemProperties));
     } else {
       if (createCache) {
-        createCache(config);
+        createCache(config, systemProperties);
       }
       for (VM vm : createCacheInVMs) {
-        vm.invoke(() -> createCache(config));
+        vm.invoke(() -> createCache(config, systemProperties));
       }
     }
   }
@@ -108,7 +110,8 @@ public class CacheRule extends DistributedExternalResource {
     return cache.getInternalDistributedSystem();
   }
 
-  private static void createCache(final Properties config) {
+  private static void createCache(final Properties config, final Properties systemProperties) {
+    System.getProperties().putAll(systemProperties);
     cache = (InternalCache) new CacheFactory(config).create();
   }
 
@@ -141,6 +144,7 @@ public class CacheRule extends DistributedExternalResource {
     private boolean disconnectAfter;
     private List<VM> createCacheInVMs = new ArrayList<>();
     private Properties config = new Properties();
+    private Properties systemProperties = new Properties();
 
     public Builder() {
       config.setProperty(LOCATORS, getLocators());
@@ -195,6 +199,16 @@ public class CacheRule extends DistributedExternalResource {
       return this;
     }
 
+    public Builder addSystemProperty(final String key, final String value) {
+      this.systemProperties.put(key, value);
+      return this;
+    }
+
+    public Builder addSystemProperties(final Properties config) {
+      this.systemProperties.putAll(config);
+      return this;
+    }
+
     public CacheRule build() {
       return new CacheRule(this);
     }

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.