You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2021/03/09 23:21:26 UTC

[geode] branch support/1.14 updated: GEODE-8761: detect stuck server connection threads (#6094) (#6105)

This is an automated email from the ASF dual-hosted git repository.

dschneider pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.14 by this push:
     new 2bb7e34  GEODE-8761: detect stuck server connection threads (#6094) (#6105)
2bb7e34 is described below

commit 2bb7e34ab1ef3a5c0965b907ca46c7ea38dde378
Author: Darrel Schneider <da...@vmware.com>
AuthorDate: Tue Mar 9 15:20:23 2021 -0800

    GEODE-8761: detect stuck server connection threads (#6094) (#6105)
    
    (cherry picked from commit df7b5b167472d87d8d262515533e7c80d7306e67)
---
 .../sockets/ServerConnectionIntegrationTest.java   | 20 +++++++++--
 .../cache/tier/sockets/ServerConnection.java       | 36 +++++++++++++++++++
 .../internal/monitoring/ThreadsMonitoring.java     |  3 +-
 .../internal/monitoring/ThreadsMonitoringImpl.java |  3 ++
 .../executor/P2PReaderExecutorGroup.java           | 22 +-----------
 ...oup.java => ServerConnectionExecutorGroup.java} | 26 ++------------
 ...ExecutorGroup.java => SuspendableExecutor.java} | 10 ++----
 .../tier/sockets/ServerConnectionFactoryTest.java  | 40 ++++++++++++++++++----
 .../cache/tier/sockets/ServerConnectionTest.java   | 19 ++++++++--
 .../monitoring/ThreadsMonitoringImplJUnitTest.java |  1 +
 .../monitoring/ThreadsMonitoringJUnitTest.java     |  5 +--
 .../executor/P2PReaderExecutorGroupTest.java       | 25 --------------
 .../ServerConnectionExecutorGroupTest.java}        | 29 ++++------------
 ...GroupTest.java => SuspendableExecutorTest.java} | 14 ++++----
 .../OutputCapturingServerConnectionTest.java       |  9 +++++
 .../tier/sockets/ProtobufServerConnectionTest.java | 12 +++++--
 16 files changed, 151 insertions(+), 123 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionIntegrationTest.java
index 48ea5e7..17e79c3 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionIntegrationTest.java
@@ -18,7 +18,7 @@ import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
-import static org.mockito.quality.Strictness.STRICT_STUBS;
+import static org.mockito.quality.Strictness.LENIENT;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -35,6 +35,8 @@ import org.junit.experimental.categories.Category;
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.TXManagerImpl;
@@ -43,6 +45,7 @@ import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.ServerSideHandshake;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.test.junit.categories.ClientServerTest;
@@ -51,11 +54,12 @@ import org.apache.geode.test.junit.categories.ClientServerTest;
 public class ServerConnectionIntegrationTest {
 
   @Rule
-  public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS);
+  public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(LENIENT);
 
   private AcceptorImpl acceptor;
   private Socket socket;
   private InternalCache cache;
+  private CachedRegionHelper cachedRegionHelper;
   private SecurityService securityService;
   private CacheServerStats stats;
 
@@ -66,11 +70,21 @@ public class ServerConnectionIntegrationTest {
     acceptor = mock(AcceptorImpl.class);
     socket = mock(Socket.class);
     cache = mock(InternalCache.class);
+    cachedRegionHelper = mock(CachedRegionHelper.class);
     securityService = mock(SecurityService.class);
     stats = mock(CacheServerStats.class);
 
     when(inetAddress.getHostAddress()).thenReturn("localhost");
     when(socket.getInetAddress()).thenReturn(inetAddress);
+
+    InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);
+
+    when(cachedRegionHelper.getCache()).thenReturn(cache);
+    when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
+    when(internalDistributedSystem.getDM()).thenReturn(distributionManager);
+    when(distributionManager.getThreadMonitoring()).thenReturn(threadsMonitoring);
   }
 
   /**
@@ -89,7 +103,7 @@ public class ServerConnectionIntegrationTest {
     when(acceptor.getConnectionListener()).thenReturn(mock(ConnectionListener.class));
 
     TestServerConnection testServerConnection =
-        new TestServerConnection(socket, cache, mock(CachedRegionHelper.class), stats, 0, 0, null,
+        new TestServerConnection(socket, cache, cachedRegionHelper, stats, 0, 0, null,
             CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor, securityService);
 
     assertThatCode(() -> testServerConnection.run()).doesNotThrowAnyException();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 625da9e..9e219c1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache.tier.sockets;
 import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_ACCESSOR;
 import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_ACCESSOR_PP;
 import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTHENTICATOR;
+import static org.apache.geode.internal.monitoring.ThreadsMonitoring.Mode.ServerConnectionExecutor;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -65,6 +66,8 @@ import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.ServerSideHandshake;
 import org.apache.geode.internal.cache.tier.sockets.command.Default;
 import org.apache.geode.internal.logging.InternalLogWriter;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
 import org.apache.geode.internal.security.AuthorizeRequest;
 import org.apache.geode.internal.security.AuthorizeRequestPP;
 import org.apache.geode.internal.security.SecurityService;
@@ -122,6 +125,13 @@ public abstract class ServerConnection implements Runnable {
 
   private final ProcessingMessageTimer processingMessageTimer = new ProcessingMessageTimer();
 
+  private final ThreadsMonitoring threadMonitoring;
+  /**
+   * The threadMonitorExecutor for this server connection.
+   * This will be null if acceptor is a selector.
+   */
+  protected final AbstractExecutor threadMonitorExecutor;
+
   public static ByteBuffer allocateCommBuffer(int size, Socket sock) {
     // I expect that size will almost always be the same value
     if (sock.getChannel() == null) {
@@ -304,6 +314,16 @@ public abstract class ServerConnection implements Runnable {
         logger.debug("While creating server connection", e);
       }
     }
+    threadMonitoring = getCache().getInternalDistributedSystem().getDM().getThreadMonitoring();
+    if (getAcceptor().isSelector()) {
+      // When a selector is used, the thread pool that is used to process client requests
+      // will automatically register with the thread monitor the thread that is processing
+      // the request. So no need to create a threadMonitorExecutor.
+      threadMonitorExecutor = null;
+    } else {
+      threadMonitorExecutor = threadMonitoring.createAbstractExecutor(ServerConnectionExecutor);
+      suspendThreadMonitoring();
+    }
   }
 
   public Acceptor getAcceptor() {
@@ -778,6 +798,7 @@ public abstract class ServerConnection implements Runnable {
     }
 
     ThreadState threadState = null;
+    resumeThreadMonitoring();
     try {
       if (message != null) {
         // Since this thread is not interrupted when the cache server is shutdown, test again after
@@ -847,6 +868,7 @@ public abstract class ServerConnection implements Runnable {
         command.execute(message, this, securityService);
       }
     } finally {
+      suspendThreadMonitoring();
       // Keep track of the fact that a message is no longer being
       // processed.
       serverConnectionCollection.connectionsProcessing.decrementAndGet();
@@ -858,6 +880,18 @@ public abstract class ServerConnection implements Runnable {
     }
   }
 
+  private void suspendThreadMonitoring() {
+    if (threadMonitorExecutor != null) {
+      threadMonitorExecutor.suspendMonitoring();
+    }
+  }
+
+  private void resumeThreadMonitoring() {
+    if (threadMonitorExecutor != null) {
+      threadMonitorExecutor.resumeMonitoring();
+    }
+  }
+
   private final Object terminationLock = new Object();
   private boolean terminated;
 
@@ -1206,6 +1240,7 @@ public abstract class ServerConnection implements Runnable {
         }
       }
     } else {
+      threadMonitoring.register(threadMonitorExecutor);
       try {
         while (processMessages && !crHelper.isShutdown()) {
           try {
@@ -1218,6 +1253,7 @@ public abstract class ServerConnection implements Runnable {
           }
         }
       } finally {
+        threadMonitoring.unregister(threadMonitorExecutor);
         try {
           unsetRequestSpecificTimeout();
           handleTermination();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java
index aa5ba6d..2210aed 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java
@@ -28,7 +28,8 @@ public interface ThreadsMonitoring {
     OneTaskOnlyExecutor,
     ScheduledThreadExecutor,
     AGSExecutor,
-    P2PReaderExecutor
+    P2PReaderExecutor,
+    ServerConnectionExecutor
   };
 
   Map<Long, AbstractExecutor> getMonitorMap();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java
index 1c0e49c..a2946f9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java
@@ -30,6 +30,7 @@ import org.apache.geode.internal.monitoring.executor.P2PReaderExecutorGroup;
 import org.apache.geode.internal.monitoring.executor.PooledExecutorGroup;
 import org.apache.geode.internal.monitoring.executor.ScheduledThreadPoolExecutorWKAGroup;
 import org.apache.geode.internal.monitoring.executor.SerialQueuedExecutorGroup;
+import org.apache.geode.internal.monitoring.executor.ServerConnectionExecutorGroup;
 
 public class ThreadsMonitoringImpl implements ThreadsMonitoring {
 
@@ -134,6 +135,8 @@ public class ThreadsMonitoringImpl implements ThreadsMonitoring {
         return new GatewaySenderEventProcessorGroup();
       case P2PReaderExecutor:
         return new P2PReaderExecutorGroup();
+      case ServerConnectionExecutor:
+        return new ServerConnectionExecutorGroup();
       default:
         throw new IllegalStateException("Unhandled mode=" + mode);
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
index 6ee9909..3e56de5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
@@ -14,30 +14,10 @@
  */
 package org.apache.geode.internal.monitoring.executor;
 
-public class P2PReaderExecutorGroup extends AbstractExecutor {
-
+public class P2PReaderExecutorGroup extends SuspendableExecutor {
   public static final String GROUP_NAME = "P2PReaderExecutor";
 
-  private volatile boolean suspended;
-
   public P2PReaderExecutorGroup() {
     super(GROUP_NAME);
   }
-
-  @Override
-  public void suspendMonitoring() {
-    suspended = true;
-  }
-
-  @Override
-  public void resumeMonitoring() {
-    setStartTime(0);
-    suspended = false;
-  }
-
-  @Override
-  public boolean isMonitoringSuspended() {
-    return suspended;
-  }
-
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/ServerConnectionExecutorGroup.java
similarity index 65%
copy from geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
copy to geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/ServerConnectionExecutorGroup.java
index 6ee9909..3dd4bb2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/ServerConnectionExecutorGroup.java
@@ -14,30 +14,10 @@
  */
 package org.apache.geode.internal.monitoring.executor;
 
-public class P2PReaderExecutorGroup extends AbstractExecutor {
+public class ServerConnectionExecutorGroup extends SuspendableExecutor {
+  public static final String GROUP_NAME = "ServerConnectionExecutor";
 
-  public static final String GROUP_NAME = "P2PReaderExecutor";
-
-  private volatile boolean suspended;
-
-  public P2PReaderExecutorGroup() {
+  public ServerConnectionExecutorGroup() {
     super(GROUP_NAME);
   }
-
-  @Override
-  public void suspendMonitoring() {
-    suspended = true;
-  }
-
-  @Override
-  public void resumeMonitoring() {
-    setStartTime(0);
-    suspended = false;
-  }
-
-  @Override
-  public boolean isMonitoringSuspended() {
-    return suspended;
-  }
-
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
similarity index 85%
copy from geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
copy to geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
index 6ee9909..9e4849b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
@@ -14,14 +14,11 @@
  */
 package org.apache.geode.internal.monitoring.executor;
 
-public class P2PReaderExecutorGroup extends AbstractExecutor {
-
-  public static final String GROUP_NAME = "P2PReaderExecutor";
-
+public abstract class SuspendableExecutor extends AbstractExecutor {
   private volatile boolean suspended;
 
-  public P2PReaderExecutorGroup() {
-    super(GROUP_NAME);
+  public SuspendableExecutor(String groupName) {
+    super(groupName);
   }
 
   @Override
@@ -39,5 +36,4 @@ public class P2PReaderExecutorGroup extends AbstractExecutor {
   public boolean isMonitoringSuspended() {
     return suspended;
   }
-
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
index 1c67e01..46a17c7 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
@@ -28,6 +28,7 @@ import java.net.Socket;
 import junitparams.JUnitParamsRunner;
 import junitparams.Parameters;
 import junitparams.naming.TestCaseName;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.contrib.java.lang.system.RestoreSystemProperties;
@@ -36,10 +37,13 @@ import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 
-import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.InternalCacheForClientAccess;
 import org.apache.geode.internal.cache.client.protocol.exception.ServiceLoadingFailureException;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.test.junit.categories.ClientServerTest;
 
@@ -61,6 +65,26 @@ public class ServerConnectionFactoryTest {
   @Rule
   public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
 
+  private InternalCacheForClientAccess cache;
+  private CachedRegionHelper cachedRegionHelper;
+
+  @Before
+  public void setUp() throws IOException {
+    cache = mock(InternalCacheForClientAccess.class);
+    cachedRegionHelper = mock(CachedRegionHelper.class);
+  }
+
+  private void setupThreadMonitoring() {
+    InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);
+
+    when(cachedRegionHelper.getCache()).thenReturn(cache);
+    when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
+    when(internalDistributedSystem.getDM()).thenReturn(distributionManager);
+    when(distributionManager.getThreadMonitoring()).thenReturn(threadsMonitoring);
+  }
+
   /**
    * Safeguard that we won't create the new client protocol object unless the feature flag is
    * enabled.
@@ -69,7 +93,7 @@ public class ServerConnectionFactoryTest {
   public void newClientProtocolFailsWithoutSystemPropertySet() {
     Throwable thrown = catchThrowable(
         () -> new ServerConnectionFactory().makeServerConnection(mock(Socket.class),
-            mock(InternalCache.class), mock(CachedRegionHelper.class), mock(CacheServerStats.class),
+            cache, cachedRegionHelper, mock(CacheServerStats.class),
             0, 0, "", CommunicationMode.ProtobufClientServerProtocol.getModeNumber(),
             mock(AcceptorImpl.class), mock(SecurityService.class)));
 
@@ -81,7 +105,7 @@ public class ServerConnectionFactoryTest {
     System.setProperty("geode.feature-protobuf-protocol", "true");
     Throwable thrown = catchThrowable(
         () -> new ServerConnectionFactory().makeServerConnection(mock(Socket.class),
-            mock(InternalCache.class), mock(CachedRegionHelper.class), mock(CacheServerStats.class),
+            cache, cachedRegionHelper, mock(CacheServerStats.class),
             0, 0, "", CommunicationMode.ProtobufClientServerProtocol.getModeNumber(),
             mock(AcceptorImpl.class), mock(SecurityService.class)));
 
@@ -97,10 +121,11 @@ public class ServerConnectionFactoryTest {
     Socket socket = mock(Socket.class);
     when(socket.getInetAddress()).thenReturn(InetAddress.getByName("localhost"));
     when(socket.getInputStream()).thenReturn(mock(InputStream.class));
+    setupThreadMonitoring();
 
     ServerConnection serverConnection =
-        new ServerConnectionFactory().makeServerConnection(socket, mock(InternalCache.class),
-            mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, "",
+        new ServerConnectionFactory().makeServerConnection(socket,
+            cache, cachedRegionHelper, mock(CacheServerStats.class), 0, 0, "",
             communicationMode.getModeNumber(),
             mock(AcceptorImpl.class), mock(SecurityService.class));
 
@@ -118,10 +143,11 @@ public class ServerConnectionFactoryTest {
     Socket socket = mock(Socket.class);
     when(socket.getInetAddress()).thenReturn(InetAddress.getByName("localhost"));
     when(socket.getInputStream()).thenReturn(mock(InputStream.class));
+    setupThreadMonitoring();
 
     ServerConnection serverConnection =
-        new ServerConnectionFactory().makeServerConnection(socket, mock(InternalCache.class),
-            mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, "",
+        new ServerConnectionFactory().makeServerConnection(socket, cache,
+            cachedRegionHelper, mock(CacheServerStats.class), 0, 0, "",
             communicationMode.getModeNumber(),
             mock(AcceptorImpl.class), mock(SecurityService.class));
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
index d2b0671..a8de093 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
@@ -36,11 +36,14 @@ import org.junit.experimental.categories.Category;
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 
-import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.InternalCacheForClientAccess;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.cache.tier.Encryptor;
 import org.apache.geode.internal.cache.tier.ServerSideHandshake;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.security.AuthenticationRequiredException;
 import org.apache.geode.test.junit.categories.ClientServerTest;
@@ -68,10 +71,20 @@ public class ServerConnectionTest {
 
     when(inetAddress.getHostAddress()).thenReturn("localhost");
     when(socket.getInetAddress()).thenReturn(inetAddress);
+    InternalCacheForClientAccess cache = mock(InternalCacheForClientAccess.class);
+    CachedRegionHelper cachedRegionHelper = mock(CachedRegionHelper.class);
+    InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);
+
+    when(cachedRegionHelper.getCache()).thenReturn(cache);
+    when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
+    when(internalDistributedSystem.getDM()).thenReturn(distributionManager);
+    when(distributionManager.getThreadMonitoring()).thenReturn(threadsMonitoring);
 
     serverConnection =
-        new ServerConnectionFactory().makeServerConnection(socket, mock(InternalCache.class),
-            mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, null,
+        new ServerConnectionFactory().makeServerConnection(socket, cache,
+            cachedRegionHelper, mock(CacheServerStats.class), 0, 0, null,
             CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor,
             mock(SecurityService.class));
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java
index 7840fbb..78af92a 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java
@@ -62,6 +62,7 @@ public class ThreadsMonitoringImplJUnitTest {
     assertTrue(threadsMonitoringImpl.startMonitor(Mode.ScheduledThreadExecutor));
     assertTrue(threadsMonitoringImpl.startMonitor(Mode.AGSExecutor));
     assertTrue(threadsMonitoringImpl.startMonitor(Mode.P2PReaderExecutor));
+    assertTrue(threadsMonitoringImpl.startMonitor(Mode.ServerConnectionExecutor));
   }
 
   @Test
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java
index 3827308..ff6f8ac 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java
@@ -35,11 +35,12 @@ public class ThreadsMonitoringJUnitTest {
     OneTaskOnlyExecutor,
     ScheduledThreadExecutor,
     AGSExecutor,
-    P2PReaderExecutor
+    P2PReaderExecutor,
+    ServerConnectionExecutor
   };
 
 
-  public final int numberOfElements = 7;
+  public final int numberOfElements = 8;
   private static final Logger logger = LogService.getLogger();
 
   /**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroupTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroupTest.java
index c266c4c..5a4aac2 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroupTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroupTest.java
@@ -25,29 +25,4 @@ public class P2PReaderExecutorGroupTest {
     assertThat(new P2PReaderExecutorGroup().getGroupName())
         .isEqualTo(P2PReaderExecutorGroup.GROUP_NAME);
   }
-
-  @Test
-  public void verifySuspendLifecycle() {
-    P2PReaderExecutorGroup executor = new P2PReaderExecutorGroup();
-    assertThat(executor.isMonitoringSuspended()).isFalse();
-    executor.suspendMonitoring();
-    assertThat(executor.isMonitoringSuspended()).isTrue();
-    executor.suspendMonitoring();
-    assertThat(executor.isMonitoringSuspended()).isTrue();
-    executor.resumeMonitoring();
-    assertThat(executor.isMonitoringSuspended()).isFalse();
-    executor.resumeMonitoring();
-    assertThat(executor.isMonitoringSuspended()).isFalse();
-    executor.suspendMonitoring();
-    assertThat(executor.isMonitoringSuspended()).isTrue();
-  }
-
-  @Test
-  public void verifyResumeClearsStartTime() {
-    P2PReaderExecutorGroup executor = new P2PReaderExecutorGroup();
-    executor.setStartTime(1);
-    assertThat(executor.getStartTime()).isEqualTo(1);
-    executor.resumeMonitoring();
-    assertThat(executor.getStartTime()).isEqualTo(0);
-  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/ServerConnectionExecutorGroupTest.java
similarity index 64%
copy from geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
copy to geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/ServerConnectionExecutorGroupTest.java
index 6ee9909..5dc99f0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/ServerConnectionExecutorGroupTest.java
@@ -14,30 +14,15 @@
  */
 package org.apache.geode.internal.monitoring.executor;
 
-public class P2PReaderExecutorGroup extends AbstractExecutor {
+import static org.assertj.core.api.Assertions.assertThat;
 
-  public static final String GROUP_NAME = "P2PReaderExecutor";
+import org.junit.Test;
 
-  private volatile boolean suspended;
+public class ServerConnectionExecutorGroupTest {
 
-  public P2PReaderExecutorGroup() {
-    super(GROUP_NAME);
+  @Test
+  public void testVerifyGroupName() {
+    assertThat(new ServerConnectionExecutorGroup().getGroupName())
+        .isEqualTo(ServerConnectionExecutorGroup.GROUP_NAME);
   }
-
-  @Override
-  public void suspendMonitoring() {
-    suspended = true;
-  }
-
-  @Override
-  public void resumeMonitoring() {
-    setStartTime(0);
-    suspended = false;
-  }
-
-  @Override
-  public boolean isMonitoringSuspended() {
-    return suspended;
-  }
-
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroupTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java
similarity index 83%
copy from geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroupTest.java
copy to geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java
index c266c4c..5cc9afb 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroupTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java
@@ -18,17 +18,17 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import org.junit.Test;
 
-public class P2PReaderExecutorGroupTest {
+public class SuspendableExecutorTest {
 
-  @Test
-  public void testVerifyGroupName() {
-    assertThat(new P2PReaderExecutorGroup().getGroupName())
-        .isEqualTo(P2PReaderExecutorGroup.GROUP_NAME);
+  public static class FakeSuspendableExecutor extends SuspendableExecutor {
+    public FakeSuspendableExecutor() {
+      super("FakeSuspendableExecutor");
+    }
   }
 
   @Test
   public void verifySuspendLifecycle() {
-    P2PReaderExecutorGroup executor = new P2PReaderExecutorGroup();
+    SuspendableExecutor executor = new FakeSuspendableExecutor();
     assertThat(executor.isMonitoringSuspended()).isFalse();
     executor.suspendMonitoring();
     assertThat(executor.isMonitoringSuspended()).isTrue();
@@ -44,7 +44,7 @@ public class P2PReaderExecutorGroupTest {
 
   @Test
   public void verifyResumeClearsStartTime() {
-    P2PReaderExecutorGroup executor = new P2PReaderExecutorGroup();
+    SuspendableExecutor executor = new FakeSuspendableExecutor();
     executor.setStartTime(1);
     assertThat(executor.getStartTime()).isEqualTo(1);
     executor.resumeMonitoring();
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java
index a3996f8..779a351 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java
@@ -35,10 +35,13 @@ import org.junit.experimental.categories.Category;
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.InternalCacheForClientAccess;
 import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.test.junit.categories.ClientServerTest;
 
@@ -64,9 +67,15 @@ public class OutputCapturingServerConnectionTest {
     socket = mock(Socket.class);
     cache = mock(InternalCacheForClientAccess.class);
     cachedRegionHelper = mock(CachedRegionHelper.class);
+    InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);
 
     when(acceptor.getClientHealthMonitor()).thenReturn(mock(ClientHealthMonitor.class));
     when(cachedRegionHelper.getCache()).thenReturn(cache);
+    when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
+    when(internalDistributedSystem.getDM()).thenReturn(distributionManager);
+    when(distributionManager.getThreadMonitoring()).thenReturn(threadsMonitoring);
     when(socket.getInetAddress()).thenReturn(mock(InetAddress.class));
     when(socket.getOutputStream()).thenReturn(new ByteArrayOutputStream());
     when(socket.getRemoteSocketAddress()).thenReturn(createUnresolved("localhost", 9071));
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java
index b408d4f..5b74cc3 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java
@@ -38,11 +38,14 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.junit.MockitoJUnit;
 import org.mockito.junit.MockitoRule;
 
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.InternalCacheForClientAccess;
 import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
 import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.test.junit.categories.ClientServerTest;
 
@@ -65,18 +68,24 @@ public class ProtobufServerConnectionTest {
     cachedRegionHelper = mock(CachedRegionHelper.class);
     clientHealthMonitor = mock(ClientHealthMonitor.class);
     socket = mock(Socket.class);
+    InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);
 
     when(acceptor.getClientHealthMonitor()).thenReturn(clientHealthMonitor);
     when(socket.getInetAddress()).thenReturn(mock(InetAddress.class));
     when(socket.getOutputStream()).thenReturn(new ByteArrayOutputStream());
     when(socket.getRemoteSocketAddress()).thenReturn(createUnresolved("localhost", 9071));
+    when(cachedRegionHelper.getCache()).thenReturn(cache);
+    when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
+    when(internalDistributedSystem.getDM()).thenReturn(distributionManager);
+    when(distributionManager.getThreadMonitoring()).thenReturn(threadsMonitoring);
   }
 
   @Test
   public void doOneMessageUnsetsProcessMessagesFlag() throws Exception {
     ClientProtocolProcessor clientProtocolProcessor = mock(ClientProtocolProcessor.class);
     doThrow(new IOException("throw me")).when(clientProtocolProcessor).processMessage(any(), any());
-    when(cachedRegionHelper.getCache()).thenReturn(cache);
     ServerConnection serverConnection = new ProtobufServerConnection(socket, cache,
         cachedRegionHelper, mock(CacheServerStats.class), 0, 1024, "",
         CommunicationMode.ProtobufClientServerProtocol.getModeNumber(), acceptor,
@@ -133,7 +142,6 @@ public class ProtobufServerConnectionTest {
 
   @Test
   public void doOneMessageNotifiesClientHealthMonitorOfPing() throws IOException {
-    when(cachedRegionHelper.getCache()).thenReturn(cache);
     ServerConnection serverConnection = new ProtobufServerConnection(socket, cache,
         cachedRegionHelper, mock(CacheServerStats.class), 0, 1024, "",
         CommunicationMode.ProtobufClientServerProtocol.getModeNumber(), acceptor,