You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/10/31 20:57:20 UTC

[geode] branch feature/GEODE-3637 updated (6e4e702 -> 8749293)

This is an automated email from the ASF dual-hosted git repository.

udo pushed a change to branch feature/GEODE-3637
in repository https://gitbox.apache.org/repos/asf/geode.git.


 discard 6e4e702  GEODE-3637: Final commit with test to confirm asynchronous client queue creation
 discard 55fad5e  GEODE-3637: Committing tests in order not to loose progress
 discard a5e3ed0  GEODE-3637: Moved client queue initialization into the ServerConnection.java
     new 8749293  GEODE-3637: Moved client queue initialization into the ServerConnection.java Added test to confirm asynchronous client queue creation

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (6e4e702)
            \
             N -- N -- N   refs/heads/feature/GEODE-3637 (8749293)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].

[geode] 01/01: GEODE-3637: Moved client queue initialization into the ServerConnection.java Added test to confirm asynchronous client queue creation

Posted by ud...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch feature/GEODE-3637
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 8749293c7c3a9b5ce095602c1057575313d6994d
Author: kohlmu-pivotal <uk...@pivotal.io>
AuthorDate: Wed Oct 25 12:37:50 2017 -0700

    GEODE-3637: Moved client queue initialization into the ServerConnection.java
    Added test to confirm asynchronous client queue creation
---
 .../internal/cache/tier/sockets/AcceptorImpl.java  | 334 ++++++++++++---------
 .../cache/tier/sockets/ServerConnection.java       |  65 ++--
 .../apache/geode/internal/SSLConfigJUnitTest.java  |   5 +-
 .../sockets/AcceptorImplClientQueueDUnitTest.java  | 263 ++++++++++++++++
 .../apache/geode/test/dunit/rules/CacheRule.java   |  22 +-
 5 files changed, 514 insertions(+), 175 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 59ef466..193b300 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -95,7 +95,7 @@ import org.apache.geode.internal.util.ArrayUtils;
 /**
  * Implements the acceptor thread on the bridge server. Accepts connections from the edge and starts
  * up threads to process requests from these.
- *
+ * 
  * @since GemFire 2.0.2
  */
 @SuppressWarnings("deprecation")
@@ -103,6 +103,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   private static final Logger logger = LogService.getLogger();
 
   private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit");
+  private static final int HANDSHAKER_DEFAULT_POOL_SIZE = 4;
 
   protected final CacheServerStats stats;
   private final int maxConnections;
@@ -115,6 +116,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   private final ThreadPoolExecutor hsPool;
 
   /**
+   * A pool used to process client-queue-initializations.
+   */
+  private final ThreadPoolExecutor clientQueueInitPool;
+
+  /**
    * The port on which this acceptor listens for client connections
    */
   private final int localPort;
@@ -268,7 +274,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * The ip address or host name this acceptor is to bind to; <code>null</code> or "" indicates it
    * will listen on all local addresses.
-   *
+   * 
    * @since GemFire 5.7
    */
   private final String bindHostName;
@@ -308,7 +314,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * Initializes this acceptor thread to listen for connections on the given port.
-   *
+   * 
    * @param port The port on which this acceptor listens for connections. If <code>0</code>, a
    *        random port will be chosen.
    * @param bindHostName The ip address or host name this acceptor listens on for connections. If
@@ -534,103 +540,130 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings,
         this.clientNotifier.getStats());
 
-    {
-      ThreadPoolExecutor tmp_pool = null;
-      String gName = "ServerConnection "
-          // + serverSock.getInetAddress()
-          + "on port " + this.localPort;
-      final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
-
-      ThreadFactory socketThreadFactory = new ThreadFactory() {
-        int connNum = -1;
-
-        public Thread newThread(final Runnable command) {
-          int tnum;
-          synchronized (this) {
-            tnum = ++connNum;
+    pool = initializeServerConnectionThreadPool();
+    hsPool = initializeHandshakerThreadPool();
+    clientQueueInitPool = initializeClientQueueInitializerThreadPool();
+
+    isAuthenticationRequired = this.securityService.isClientSecurityRequired();
+
+    isIntegratedSecurity = this.securityService.isIntegratedSecurity();
+
+    String postAuthzFactoryName =
+        this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
+
+    isPostAuthzCallbackPresent =
+        (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false;
+  }
+
+  private ThreadPoolExecutor initializeHandshakerThreadPool() throws IOException {
+    String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort;
+    final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+
+    ThreadFactory socketThreadFactory = new ThreadFactory() {
+      AtomicInteger connNum = new AtomicInteger(-1);
+
+      @Override
+      public Thread newThread(Runnable command) {
+        String threadName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
+        getStats().incAcceptThreadsCreated();
+        return new Thread(socketThreadGroup, command, threadName);
+      }
+    };
+    try {
+      final BlockingQueue blockingQueue = new SynchronousQueue();
+      final RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
+        public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
+          try {
+            blockingQueue.put(r);
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt(); // preserve the state
+            throw new RejectedExecutionException(
+                LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex);
           }
-          String tName = socketThreadGroup.getName() + " Thread " + tnum;
-          getStats().incConnectionThreadsCreated();
-          Runnable r = new Runnable() {
-            public void run() {
-              try {
-                command.run();
-              } catch (CancelException e) { // bug 39463
-                // ignore
-              } finally {
-                ConnectionTable.releaseThreadsSockets();
-              }
-            }
-          };
-          return new Thread(socketThreadGroup, r, tName);
         }
       };
-      try {
-        if (isSelector()) {
-          tmp_pool = new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads,
-              getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE);
-        } else {
-          tmp_pool = new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L,
-              TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory);
-        }
-      } catch (IllegalArgumentException poolInitException) {
-        this.stats.close();
-        this.serverSock.close();
-        throw poolInitException;
-      }
-      this.pool = tmp_pool;
+      logger.warn("Handshaker max Pool size: " + HANDSHAKE_POOL_SIZE);
+      return new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, blockingQueue,
+          socketThreadFactory, rejectedExecutionHandler);
+    } catch (IllegalArgumentException poolInitException) {
+      this.stats.close();
+      this.serverSock.close();
+      this.pool.shutdownNow();
+      throw poolInitException;
     }
-    {
-      ThreadPoolExecutor tmp_hsPool = null;
-      String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort;
-      final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+  }
 
-      ThreadFactory socketThreadFactory = new ThreadFactory() {
-        int connNum = -1;
+  private ThreadPoolExecutor initializeClientQueueInitializerThreadPool() throws IOException {
+    final ThreadGroup clientQueueThreadGroup =
+        LoggingThreadGroup.createThreadGroup("Client Queue Initialization ", logger);
 
-        public Thread newThread(Runnable command) {
-          int tnum;
-          synchronized (this) {
-            tnum = ++connNum;
-          }
-          String tName = socketThreadGroup.getName() + " Thread " + tnum;
-          getStats().incAcceptThreadsCreated();
-          return new Thread(socketThreadGroup, command, tName);
-        }
-      };
-      try {
-        final BlockingQueue bq = new SynchronousQueue();
-        final RejectedExecutionHandler reh = new RejectedExecutionHandler() {
-          public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
+    ThreadFactory clientQueueThreadFactory = new ThreadFactory() {
+      AtomicInteger connNum = new AtomicInteger(-1);
+
+      @Override
+      public Thread newThread(final Runnable command) {
+        String threadName =
+            clientQueueThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
+        Runnable runnable = new Runnable() {
+          public void run() {
             try {
-              bq.put(r);
-            } catch (InterruptedException ex) {
-              Thread.currentThread().interrupt(); // preserve the state
-              throw new RejectedExecutionException(
-                  LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex);
+              command.run();
+            } catch (CancelException e) {
+              logger.debug("Client Queue Initialization was canceled.", e);
             }
           }
         };
-        tmp_hsPool = new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, bq,
-            socketThreadFactory, reh);
-      } catch (IllegalArgumentException poolInitException) {
-        this.stats.close();
-        this.serverSock.close();
-        this.pool.shutdownNow();
-        throw poolInitException;
+        return new Thread(clientQueueThreadGroup, runnable, threadName);
       }
-      this.hsPool = tmp_hsPool;
+    };
+    try {
+      return new ThreadPoolExecutor(1, 16, 60, TimeUnit.SECONDS, new SynchronousQueue(),
+          clientQueueThreadFactory);
+    } catch (IllegalArgumentException poolInitException) {
+      throw poolInitException;
     }
+  }
 
-    isAuthenticationRequired = this.securityService.isClientSecurityRequired();
-
-    isIntegratedSecurity = this.securityService.isIntegratedSecurity();
-
-    String postAuthzFactoryName =
-        this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
-
-    isPostAuthzCallbackPresent =
-        (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false;
+  private ThreadPoolExecutor initializeServerConnectionThreadPool() throws IOException {
+    String gName = "ServerConnection "
+        // + serverSock.getInetAddress()
+        + "on port " + this.localPort;
+    final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+
+    ThreadFactory socketThreadFactory = new ThreadFactory() {
+      AtomicInteger connNum = new AtomicInteger(-1);
+
+      @Override
+      public Thread newThread(final Runnable command) {
+        String tName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
+        getStats().incConnectionThreadsCreated();
+        Runnable r = new Runnable() {
+          public void run() {
+            try {
+              command.run();
+            } catch (CancelException e) { // bug 39463
+              // ignore
+            } finally {
+              ConnectionTable.releaseThreadsSockets();
+            }
+          }
+        };
+        return new Thread(socketThreadGroup, r, tName);
+      }
+    };
+    try {
+      if (isSelector()) {
+        return new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads,
+            getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE);
+      } else {
+        return new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L,
+            TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory);
+      }
+    } catch (IllegalArgumentException poolInitException) {
+      this.stats.close();
+      this.serverSock.close();
+      throw poolInitException;
+    }
   }
 
   public long getAcceptorId() {
@@ -651,7 +684,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * This system property is only used if max-threads == 0. This is for 5.0.2 backwards
    * compatibility.
-   *
+   * 
    * @deprecated since 5.1 use cache-server max-threads instead
    */
   @Deprecated
@@ -660,14 +693,14 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * This system property is only used if max-threads == 0. This is for 5.0.2 backwards
    * compatibility.
-   *
+   * 
    * @deprecated since 5.1 use cache-server max-threads instead
    */
   @Deprecated
   private final static int DEPRECATED_SELECTOR_POOL_SIZE =
       Integer.getInteger("BridgeServer.SELECTOR_POOL_SIZE", 16).intValue();
-  private final static int HANDSHAKE_POOL_SIZE =
-      Integer.getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", 4).intValue();
+  private final static int HANDSHAKE_POOL_SIZE = Integer
+      .getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", HANDSHAKER_DEFAULT_POOL_SIZE).intValue();
 
   @Override
   public void start() throws IOException {
@@ -798,12 +831,13 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * Ensure that the CachedRegionHelper and ServerConnection classes get loaded.
-   *
+   * 
    * @see SystemFailure#loadEmergencyClasses()
    */
   public static void loadEmergencyClasses() {
-    if (emergencyClassesLoaded)
+    if (emergencyClassesLoaded) {
       return;
+    }
     emergencyClassesLoaded = true;
     CachedRegionHelper.loadEmergencyClasses();
     ServerConnection.loadEmergencyClasses();
@@ -870,8 +904,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   private Selector tmpSel;
 
   private void checkForStuckKeys() {
-    if (!WORKAROUND_SELECTOR_BUG)
+    if (!WORKAROUND_SELECTOR_BUG) {
       return;
+    }
     if (tmpSel == null) {
       try {
         tmpSel = Selector.open();
@@ -887,8 +922,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     while (it.hasNext()) {
       SelectionKey sk = (SelectionKey) it.next();
       ServerConnection sc = (ServerConnection) sk.attachment();
-      if (sc == null)
+      if (sc == null) {
         continue;
+      }
       try {
         sk.cancel();
         this.selector.selectNow(); // clear the cancelled key
@@ -1040,40 +1076,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
           break;
         }
         if (events == 0) {
-          // zeroEventsCount++;
-          // if (zeroEventsCount > 0) {
-          // zeroEventsCount = 0;
           checkForStuckKeys();
-
-          // try {
-          // this.selector.close(); // this selector is sick!
-          // } catch (IOException ignore) {
-          // }
-          // this.selector = Selector.open();
-          // {
-          // Iterator it = selectorRegistrations.iterator();
-          // while (it.hasNext()) {
-          // ServerConnection sc = (ServerConnection)it.next();
-          // sc.registerWithSelector2(this.selector);
-          // }
-          // }
-          // }
-          // ArrayList al = new ArrayList();
-          // Iterator keysIt = this.selector.keys().iterator();
-          // while (keysIt.hasNext()) {
-          // SelectionKey sk = (SelectionKey)keysIt.next();
-          // al.add(sk.attachment());
-          // sk.cancel();
-          // }
-          // events = this.selector.selectNow();
-          // Iterator alIt = al.iterator();
-          // while (alIt.hasNext()) {
-          // ServerConnection sc = (ServerConnection)alIt.next();
-          // sc.registerWithSelector2(this.selector);
-          // }
-          // events = this.selector.select();
-          // } else {
-          // zeroEventsCount = 0;
         }
         while (events > 0) {
           int cancelCount = 0;
@@ -1130,16 +1133,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
                   logger.warn(
                       LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED, rejected));
                 }
-                // } else if (key.isValid() && key.isConnectable()) {
-                // logger.info("DEBUG isConnectable and isValid key=" + key);
-                // finishCon(sc);
               } else {
                 finishCon(sc);
                 if (key.isValid()) {
                   logger.warn(LocalizedMessage.create(
                       LocalizedStrings.AcceptorImpl_IGNORING_EVENT_ON_SELECTOR_KEY__0, key));
-                  // } else {
-                  // logger.info("DEBUG !isValid key=" + key);
                 }
               }
             } catch (CancelledKeyException ex) { // fix for bug 37739
@@ -1195,7 +1193,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * The work loop of this acceptor
-   *
+   * 
    * @see #accept
    */
   public void run() {
@@ -1405,6 +1403,10 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     return this.clientServerCnxCount.get();
   }
 
+  public boolean isNotifyBySubscription() {
+    return notifyBySubscription;
+  }
+
   protected void handleNewClientConnection(final Socket socket,
       final ServerConnectionFactory serverConnectionFactory) throws IOException {
     // Read the first byte. If this socket is being used for 'client to server'
@@ -1427,12 +1429,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       throw new EOFException();
     }
 
-    if (communicationMode.isSubscriptionFeed()) {
-      boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
-      logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
-          primary ? "primary" : "secondary", socket);
-      AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
-          this.notifyBySubscription);
+    // GEODE-3637 - If the communicationMode is client Subscriptions, hand-off the client queue
+    // initialization to be done in another threadPool
+    if (initializeClientPools(socket, communicationMode)) {
       return;
     }
 
@@ -1498,6 +1497,17 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     }
   }
 
+  private boolean initializeClientPools(Socket socket, CommunicationMode communicationMode) {
+    if (communicationMode.isSubscriptionFeed()) {
+      boolean isPrimaryServerToClient =
+          communicationMode == CommunicationMode.PrimaryServerToClient;
+      clientQueueInitPool
+          .execute(new ClientQueueInitializerTask(socket, isPrimaryServerToClient, this));
+      return true;
+    }
+    return false;
+  }
+
   private CommunicationMode getCommunicationModeForNonSelector(Socket socket) throws IOException {
     socket.setSoTimeout(this.acceptTimeout);
     this.socketCreator.configureServerSSLSocket(socket);
@@ -1662,7 +1672,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    *        then calculate it.
    * @return the ip address or host name this acceptor will listen on. An "" if all local addresses
    *         will be listened to.
-   *
    * @since GemFire 5.7
    */
   private static String calcBindHostName(Cache cache, String bindName) {
@@ -1699,7 +1708,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
   /**
    * Gets the address that this bridge server can be contacted on from external processes.
-   *
+   * 
    * @since GemFire 5.7
    */
   public String getExternalAddress() {
@@ -1733,7 +1742,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
   /**
    * This method finds a client notifier and returns it. It is used to propagate interest
    * registrations to other servers
-   *
+   * 
    * @return the instance that provides client notification
    */
   @Override
@@ -1791,7 +1800,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
    * This method returns a thread safe structure which can be iterated over without worrying about
    * ConcurrentModificationException. JMX MBeans/Commands need to iterate over this list to get
    * client info.
-   *
    */
   public ServerConnection[] getAllServerConnectionList() {
     return this.allSCList;
@@ -1815,4 +1823,42 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
 
     releaseCommBuffer(Message.setTLCommBuffer(null));
   }
+
+  private class ClientQueueInitializerTask implements Runnable {
+    private final Socket socket;
+    private final boolean isPrimaryServerToClient;
+    private final AcceptorImpl acceptor;
+
+    public ClientQueueInitializerTask(Socket socket, boolean isPrimaryServerToClient,
+        AcceptorImpl acceptor) {
+      this.socket = socket;
+      this.acceptor = acceptor;
+      this.isPrimaryServerToClient = isPrimaryServerToClient;
+    }
+
+    @Override
+    public void run() {
+      logger.info(":Bridge server: Initializing {} server-to-client communication socket: {}",
+          isPrimaryServerToClient ? "primary" : "secondary", socket);
+      try {
+        acceptor.getCacheClientNotifier().registerClient(socket, isPrimaryServerToClient,
+            acceptor.getAcceptorId(), acceptor.isNotifyBySubscription());
+      } catch (IOException ex) {
+        closeSocket(socket);
+        if (isRunning()) {
+          if (!acceptor.loggedAcceptError) {
+            acceptor.loggedAcceptError = true;
+            if (ex instanceof SocketTimeoutException) {
+              logger.warn(LocalizedMessage.create(
+                  LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION_DUE_TO_SOCKET_TIMEOUT));
+            } else {
+              logger.warn(LocalizedMessage.create(
+                  LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION__0,
+                  ex), ex);
+            }
+          }
+        }
+      }
+    }
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 7fc688c..b141c6c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -72,7 +72,7 @@ import org.apache.geode.security.GemFireSecurityException;
  * Provides an implementation for the server socket end of the hierarchical cache connection. Each
  * server connection runs in its own thread to maximize concurrency and improve response times to
  * edge requests
- *
+ * 
  * @since GemFire 2.0.2
  */
 public abstract class ServerConnection implements Runnable {
@@ -189,16 +189,18 @@ public abstract class ServerConnection implements Runnable {
    */
   private volatile int requestSpecificTimeout = -1;
 
-  /** Tracks the id of the most recent batch to which a reply has been sent */
+  /**
+   * Tracks the id of the most recent batch to which a reply has been sent
+   */
   private int latestBatchIdReplied = -1;
 
   /*
    * Uniquely identifying the client's Distributed System
    *
-   * 
+   *
    * private String membershipId;
-   * 
-   * 
+   *
+   *
    * Uniquely identifying the client's ConnectionProxy object
    *
    *
@@ -711,8 +713,9 @@ public abstract class ServerConnection implements Runnable {
       // can be used.
       initializeCommands();
       // its initialized in verifyClientConnection call
-      if (!getCommunicationMode().isWAN())
+      if (!getCommunicationMode().isWAN()) {
         initializeClientUserAuths();
+      }
     }
     if (TEST_VERSION_AFTER_HANDSHAKE_FLAG) {
       Assert.assertTrue((this.handshake.getVersion().ordinal() == testVersionAfterHandshake),
@@ -892,7 +895,9 @@ public abstract class ServerConnection implements Runnable {
         }
       }
       if (unregisterClient)// last serverconnection call all close on auth objects
+      {
         cleanClientAuths();
+      }
       this.clientUserAuths = null;
       if (needsUnregister) {
         this.acceptor.getClientHealthMonitor().removeConnection(this.proxyId, this);
@@ -917,8 +922,9 @@ public abstract class ServerConnection implements Runnable {
     ClientUserAuths cua = new ClientUserAuths(proxyId.hashCode());
     ClientUserAuths retCua = proxyIdVsClientUserAuths.putIfAbsent(proxyId, cua);
 
-    if (retCua == null)
+    if (retCua == null) {
       return cua;
+    }
     return retCua;
   }
 
@@ -954,8 +960,9 @@ public abstract class ServerConnection implements Runnable {
         boolean removed = this.clientUserAuths.removeSubject(aIds.getUniqueId());
 
         // if not successfull, try the old way
-        if (!removed)
+        if (!removed) {
           removed = this.clientUserAuths.removeUserId(aIds.getUniqueId(), keepalive);
+        }
         return removed;
 
       } catch (NullPointerException npe) {
@@ -984,7 +991,7 @@ public abstract class ServerConnection implements Runnable {
         /*
          * This means that client and server VMs have different security settings. The server does
          * not have any security settings specified while client has.
-         * 
+         *
          * Here, should we just ignore this and send the dummy security part (connectionId, userId)
          * in the response (in this case, client needs to know that it is not expected to read any
          * security part in any of the server response messages) or just throw an exception
@@ -1010,7 +1017,6 @@ public abstract class ServerConnection implements Runnable {
         throw new AuthenticationFailedException("Authentication failed");
       }
 
-
       byte[] credBytes = msg.getPart(0).getSerializedForm();
 
       credBytes = ((HandShake) this.handshake).decryptBytes(credBytes);
@@ -1066,7 +1072,7 @@ public abstract class ServerConnection implements Runnable {
   /**
    * MessageType of the messages (typically internal commands) which do not need to participate in
    * security should be added in the following if block.
-   *
+   * 
    * @return Part
    * @see AbstractOp#processSecureBytes(Connection, Message)
    * @see AbstractOp#needsUserId()
@@ -1124,6 +1130,7 @@ public abstract class ServerConnection implements Runnable {
 
   public void run() {
     setOwner();
+
     if (getAcceptor().isSelector()) {
       boolean finishedMsg = false;
       try {
@@ -1136,9 +1143,7 @@ public abstract class ServerConnection implements Runnable {
             finishedMsg = true;
           }
         }
-      } catch (java.nio.channels.ClosedChannelException ignore) {
-        // ok shutting down
-      } catch (CancelException e) {
+      } catch (java.nio.channels.ClosedChannelException | CancelException ignore) {
         // ok shutting down
       } catch (IOException ex) {
         logger.warn(
@@ -1187,6 +1192,7 @@ public abstract class ServerConnection implements Runnable {
    * If registered with a selector then this will be the key we are registered with.
    */
   // private SelectionKey sKey = null;
+
   /**
    * Register this connection with the given selector for read events. Note that switch the channel
    * to non-blocking so it can be in a selector.
@@ -1202,7 +1208,8 @@ public abstract class ServerConnection implements Runnable {
   }
 
   public void registerWithSelector2(Selector s) throws IOException {
-    /* this.sKey = */getSelectableChannel().register(s, SelectionKey.OP_READ, this);
+    /* this.sKey = */
+    getSelectableChannel().register(s, SelectionKey.OP_READ, this);
   }
 
   /**
@@ -1225,7 +1232,6 @@ public abstract class ServerConnection implements Runnable {
   }
 
   /**
-   *
    * @return String representing the DistributedSystemMembership of the Client VM
    */
   public String getMembershipID() {
@@ -1265,10 +1271,11 @@ public abstract class ServerConnection implements Runnable {
   }
 
   protected int getClientReadTimeout() {
-    if (this.requestSpecificTimeout == -1)
+    if (this.requestSpecificTimeout == -1) {
       return this.handshake.getClientReadTimeout();
-    else
+    } else {
       return this.requestSpecificTimeout;
+    }
   }
 
   protected boolean isProcessingMessage() {
@@ -1492,7 +1499,7 @@ public abstract class ServerConnection implements Runnable {
 
   /**
    * Just ensure that this class gets loaded.
-   *
+   * 
    * @see SystemFailure#loadEmergencyClasses()
    */
   public static void loadEmergencyClasses() {
@@ -1519,7 +1526,9 @@ public abstract class ServerConnection implements Runnable {
     return this.name;
   }
 
-  /** returns the name of this connection */
+  /**
+   * returns the name of this connection
+   */
   public String getName() {
     return this.name;
   }
@@ -1736,11 +1745,13 @@ public abstract class ServerConnection implements Runnable {
     // for backward client it will be store in member variable userAuthId
     // for other look "requestMsg" here and get unique-id from this to get the authzrequest
 
-    if (!AcceptorImpl.isAuthenticationRequired())
+    if (!AcceptorImpl.isAuthenticationRequired()) {
       return null;
+    }
 
-    if (AcceptorImpl.isIntegratedSecurity())
+    if (AcceptorImpl.isIntegratedSecurity()) {
       return null;
+    }
 
     long uniqueId = getUniqueId();
 
@@ -1768,11 +1779,13 @@ public abstract class ServerConnection implements Runnable {
 
   public AuthorizeRequestPP getPostAuthzRequest()
       throws AuthenticationRequiredException, IOException {
-    if (!AcceptorImpl.isAuthenticationRequired())
+    if (!AcceptorImpl.isAuthenticationRequired()) {
       return null;
+    }
 
-    if (AcceptorImpl.isIntegratedSecurity())
+    if (AcceptorImpl.isIntegratedSecurity()) {
       return null;
+    }
 
     // look client version and return authzrequest
     // for backward client it will be store in member variable userAuthId
@@ -1799,7 +1812,9 @@ public abstract class ServerConnection implements Runnable {
     return postAuthReq;
   }
 
-  /** returns the member ID byte array to be used for creating EventID objects */
+  /**
+   * returns the member ID byte array to be used for creating EventID objects
+   */
   public byte[] getEventMemberIDByteArray() {
     return this.memberIdByteArray;
   }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/SSLConfigJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/SSLConfigJUnitTest.java
index 357736b..8904380 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/SSLConfigJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/SSLConfigJUnitTest.java
@@ -21,6 +21,7 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -44,7 +45,8 @@ public class SSLConfigJUnitTest {
   private static final Properties GATEWAY_SSL_PROPS_MAP = new Properties();
   private static final Properties GATEWAY_PROPS_SUBSET_MAP = new Properties();
 
-  static {
+  @BeforeClass
+  public static void initializeSSLMaps() {
 
     SSL_PROPS_MAP.put("javax.net.ssl.keyStoreType", "jks");
     SSL_PROPS_MAP.put("javax.net.ssl.keyStore", "/export/gemfire-configs/gemfire.keystore");
@@ -383,7 +385,6 @@ public class SSLConfigJUnitTest {
     gemFireProps.put(CLUSTER_SSL_CIPHERS, sslciphers);
     gemFireProps.put(CLUSTER_SSL_REQUIRE_AUTHENTICATION, String.valueOf(requireAuth));
 
-
     gemFireProps.put(SERVER_SSL_ENABLED, String.valueOf(cacheServerSslenabled));
     gemFireProps.put(SERVER_SSL_PROTOCOLS, cacheServerSslprotocols);
     gemFireProps.put(SERVER_SSL_CIPHERS, cacheServerSslciphers);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
new file mode 100644
index 0000000..f5a2e63
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.rmi.RemoteException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.server.ClientSubscriptionConfig;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.DistributedLockBlackboard;
+import org.apache.geode.distributed.DistributedLockBlackboardImpl;
+import org.apache.geode.internal.cache.DiskStoreAttributes;
+import org.apache.geode.internal.cache.InitialImageOperation;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.dunit.rules.SharedCountersRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category(DistributedTest.class)
+public class AcceptorImplClientQueueDUnitTest implements Serializable {
+  private final Host host = Host.getHost(0);
+  private final static int numberOfEntries = 200;
+  private final static AtomicInteger eventCount = new AtomicInteger(0);
+  private final static AtomicBoolean completedClient2 = new AtomicBoolean(false);
+
+  @ClassRule
+  public static DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  @Rule
+  public CacheRule cacheRule =
+      CacheRule.builder().createCacheIn(host.getVM(0)).createCacheIn(host.getVM(1))
+          .addSystemProperty("BridgeServer.HANDSHAKE_POOL_SIZE", "1").build();
+
+  @Rule
+  public SerializableTestName name = new SerializableTestName();
+
+  @Rule
+  public SerializableTemporaryFolder tempDir = new SerializableTemporaryFolder();
+
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties =
+      new DistributedRestoreSystemProperties();
+
+  private DistributedLockBlackboard blackboard = null;
+
+  @Before
+  public void setup() throws Exception {
+    blackboard = DistributedLockBlackboardImpl.getInstance();
+  }
+
+  @After
+  public void tearDown() throws RemoteException {
+    blackboard.initCount();
+    host.getAllVMs().forEach((vm) -> vm.invoke(() -> {
+      InitialImageOperation.slowImageProcessing = 0;
+      System.getProperties().remove("BridgeServer.HANDSHAKE_POOL_SIZE");
+    }));
+  }
+
+  @Test
+  public void testClientSubscriptionQueueBlockingConnectionInitialization() throws Exception {
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+    int vm0_port = vm0.invoke("Start server with subscription turned on", () -> {
+      try {
+        return createSubscriptionServer(cacheRule.getCache());
+      } catch (IOException e) {
+        return 0;
+      }
+    });
+
+    vm2.invoke("Start Client1 with durable interest registration turned on", () -> {
+      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+      clientCacheFactory.setPoolSubscriptionEnabled(true);
+      clientCacheFactory.setPoolSubscriptionRedundancy(1);
+      clientCacheFactory.setPoolReadTimeout(200);
+      clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+      ClientCache cache = clientCacheFactory.set("durable-client-id", "1")
+          .set("durable-client-timeout", "300").set("mcast-port", "0").create();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region region = clientRegionFactory.create("subscriptionRegion");
+
+      region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+      cache.readyForEvents();
+      cache.close(true);
+    });
+    vm3.invoke("Start Client2 to add entries to region", () -> {
+      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+      clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+      ClientCache cache = clientCacheFactory.set("mcast-port", "0").create();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region region = clientRegionFactory.create("subscriptionRegion");
+
+      for (int i = 0; i < numberOfEntries; i++) {
+        region.put(i, i);
+      }
+      cache.close();
+    });
+
+    int vm1_port = vm1.invoke("Start server2 in with subscriptions turned on", () -> {
+      try {
+        int serverPort = createSubscriptionServer(cacheRule.getCache());
+        InitialImageOperation.slowImageProcessing = 30;
+        return serverPort;
+      } catch (IOException e) {
+        return 0;
+      }
+    });
+
+    vm0.invoke("Turn on slow image processsing", () -> {
+      InitialImageOperation.slowImageProcessing = 30;
+    });
+
+    AsyncInvocation<Boolean> completedClient1 =
+        vm2.invokeAsync("Start Client1, expecting durable messages to be delivered", () -> {
+
+          ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+          clientCacheFactory.setPoolSubscriptionEnabled(true);
+          clientCacheFactory.setPoolSubscriptionRedundancy(1);
+          clientCacheFactory.setPoolMinConnections(1);
+          clientCacheFactory.setPoolMaxConnections(1);
+          clientCacheFactory.setPoolReadTimeout(200);
+          clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
+          ClientCacheFactory cacheFactory = clientCacheFactory.set("durable-client-id", "1")
+              .set("durable-client-timeout", "300").set("mcast-port", "0");
+          blackboard.incCount();
+          ClientCache cache = cacheFactory.create();
+
+          ClientRegionFactory<Object, Object> clientRegionFactory =
+              cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+          Region region = clientRegionFactory.addCacheListener(new CacheListenerAdapter() {
+            @Override
+            public void afterCreate(EntryEvent event) {
+              eventCount.incrementAndGet();
+            }
+
+            @Override
+            public void afterUpdate(EntryEvent event) {
+              eventCount.incrementAndGet();
+            }
+          }).create("subscriptionRegion");
+
+          region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+          cache.readyForEvents();
+          Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
+              .until(() -> eventCount.get() == numberOfEntries);
+          cache.close();
+          return eventCount.get() == numberOfEntries;
+        });
+
+    vm3.invokeAsync("Start Client2 to add entries to region", () -> {
+      while (true) {
+        Thread.sleep(100);
+        if (blackboard.getCount() == 1) {
+          break;
+        }
+      }
+      ClientCache cache = null;
+      ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+      clientCacheFactory.setPoolRetryAttempts(0);
+      clientCacheFactory.setPoolMinConnections(1);
+      clientCacheFactory.setPoolMaxConnections(1);
+      clientCacheFactory.setPoolReadTimeout(200);
+      clientCacheFactory.setPoolSocketConnectTimeout(500);
+      clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
+      cache = clientCacheFactory.set("mcast-port", "0").create();
+      ClientRegionFactory<Object, Object> clientRegionFactory =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region region = clientRegionFactory.create("subscriptionRegion");
+
+      int returnValue = 0;
+      for (int i = 0; i < 100; i++) {
+        returnValue = (int) region.get(i);
+      }
+      cache.close();
+      completedClient2.set(returnValue == 99);
+    });
+    assertTrue(completedClient1.get());
+    assertTrue(vm3.invoke(() -> completedClient2.get()));
+  }
+
+  private int createSubscriptionServer(InternalCache cache) throws IOException {
+    initializeDiskStore(cache);
+    initializeReplicateRegion(cache);
+    return initializeCacheServerWithSubscription(host, cache);
+  }
+
+  private void initializeDiskStore(InternalCache cache) throws IOException {
+    DiskStoreAttributes diskStoreAttributes = new DiskStoreAttributes();
+    diskStoreAttributes.name = "clientQueueDS";
+    diskStoreAttributes.diskDirs = new File[] {tempDir.newFolder(name + "_dir")};
+    cache.createDiskStoreFactory(diskStoreAttributes).create("clientQueueDS");
+  }
+
+  private void initializeReplicateRegion(InternalCache cache) {
+    cache.createRegionFactory(RegionShortcut.REPLICATE).setStatisticsEnabled(true)
+        .setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL))
+        .create("subscriptionRegion");
+  }
+
+  private int initializeCacheServerWithSubscription(Host host, InternalCache cache)
+      throws IOException {
+    CacheServer cacheServer1 = cache.addCacheServer(false);
+    ClientSubscriptionConfig clientSubscriptionConfig = cacheServer1.getClientSubscriptionConfig();
+    clientSubscriptionConfig.setEvictionPolicy("entry");
+    clientSubscriptionConfig.setCapacity(5);
+    clientSubscriptionConfig.setDiskStoreName("clientQueueDS");
+    cacheServer1.setPort(0);
+    cacheServer1.setHostnameForClients(host.getHostName());
+    cacheServer1.start();
+    return cacheServer1.getPort();
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
index dc42da8..b65bf86 100644
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
@@ -63,6 +63,7 @@ public class CacheRule extends DistributedExternalResource {
   private final boolean disconnectAfter;
   private final List<VM> createCacheInVMs;
   private final Properties config;
+  private final Properties systemProperties;
 
   public static Builder builder() {
     return new Builder();
@@ -74,18 +75,19 @@ public class CacheRule extends DistributedExternalResource {
     this.disconnectAfter = builder.disconnectAfter;
     this.createCacheInVMs = builder.createCacheInVMs;
     this.config = builder.config;
+    this.systemProperties = builder.systemProperties;
   }
 
   @Override
   protected void before() {
     if (createCacheInAll) {
-      invoker().invokeInEveryVMAndController(() -> createCache(config));
+      invoker().invokeInEveryVMAndController(() -> createCache(config, systemProperties));
     } else {
       if (createCache) {
-        createCache(config);
+        createCache(config, systemProperties);
       }
       for (VM vm : createCacheInVMs) {
-        vm.invoke(() -> createCache(config));
+        vm.invoke(() -> createCache(config, systemProperties));
       }
     }
   }
@@ -108,7 +110,8 @@ public class CacheRule extends DistributedExternalResource {
     return cache.getInternalDistributedSystem();
   }
 
-  private static void createCache(final Properties config) {
+  private static void createCache(final Properties config, final Properties systemProperties) {
+    System.getProperties().putAll(systemProperties);
     cache = (InternalCache) new CacheFactory(config).create();
   }
 
@@ -141,6 +144,7 @@ public class CacheRule extends DistributedExternalResource {
     private boolean disconnectAfter;
     private List<VM> createCacheInVMs = new ArrayList<>();
     private Properties config = new Properties();
+    private Properties systemProperties = new Properties();
 
     public Builder() {
       config.setProperty(LOCATORS, getLocators());
@@ -195,6 +199,16 @@ public class CacheRule extends DistributedExternalResource {
       return this;
     }
 
+    public Builder addSystemProperty(final String key, final String value) {
+      this.systemProperties.put(key, value);
+      return this;
+    }
+
+    public Builder addSystemProperties(final Properties config) {
+      this.systemProperties.putAll(config);
+      return this;
+    }
+
     public CacheRule build() {
       return new CacheRule(this);
     }

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.