You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2021/03/09 17:01:48 UTC
[geode] branch develop updated: GEODE-8761: detect stuck server
connection threads (#6094)
This is an automated email from the ASF dual-hosted git repository.
dschneider pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new df7b5b1 GEODE-8761: detect stuck server connection threads (#6094)
df7b5b1 is described below
commit df7b5b167472d87d8d262515533e7c80d7306e67
Author: Darrel Schneider <da...@vmware.com>
AuthorDate: Tue Mar 9 09:00:18 2021 -0800
GEODE-8761: detect stuck server connection threads (#6094)
---
.../sockets/ServerConnectionIntegrationTest.java | 20 +++++++++--
.../cache/tier/sockets/ServerConnection.java | 36 +++++++++++++++++++
.../internal/monitoring/ThreadsMonitoring.java | 3 +-
.../internal/monitoring/ThreadsMonitoringImpl.java | 3 ++
.../executor/P2PReaderExecutorGroup.java | 22 +-----------
...oup.java => ServerConnectionExecutorGroup.java} | 26 ++------------
...ExecutorGroup.java => SuspendableExecutor.java} | 10 ++----
.../tier/sockets/ServerConnectionFactoryTest.java | 40 ++++++++++++++++++----
.../cache/tier/sockets/ServerConnectionTest.java | 19 ++++++++--
.../monitoring/ThreadsMonitoringImplJUnitTest.java | 1 +
.../monitoring/ThreadsMonitoringJUnitTest.java | 5 +--
.../executor/P2PReaderExecutorGroupTest.java | 25 --------------
.../ServerConnectionExecutorGroupTest.java} | 29 ++++------------
...GroupTest.java => SuspendableExecutorTest.java} | 14 ++++----
.../OutputCapturingServerConnectionTest.java | 9 +++++
.../tier/sockets/ProtobufServerConnectionTest.java | 12 +++++--
16 files changed, 151 insertions(+), 123 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionIntegrationTest.java
index 48ea5e7..17e79c3 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionIntegrationTest.java
@@ -18,7 +18,7 @@ import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import static org.mockito.quality.Strictness.STRICT_STUBS;
+import static org.mockito.quality.Strictness.LENIENT;
import java.io.IOException;
import java.net.InetAddress;
@@ -35,6 +35,8 @@ import org.junit.experimental.categories.Category;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.TXManagerImpl;
@@ -43,6 +45,7 @@ import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.ServerSideHandshake;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.test.junit.categories.ClientServerTest;
@@ -51,11 +54,12 @@ import org.apache.geode.test.junit.categories.ClientServerTest;
public class ServerConnectionIntegrationTest {
@Rule
- public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS);
+ public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(LENIENT);
private AcceptorImpl acceptor;
private Socket socket;
private InternalCache cache;
+ private CachedRegionHelper cachedRegionHelper;
private SecurityService securityService;
private CacheServerStats stats;
@@ -66,11 +70,21 @@ public class ServerConnectionIntegrationTest {
acceptor = mock(AcceptorImpl.class);
socket = mock(Socket.class);
cache = mock(InternalCache.class);
+ cachedRegionHelper = mock(CachedRegionHelper.class);
securityService = mock(SecurityService.class);
stats = mock(CacheServerStats.class);
when(inetAddress.getHostAddress()).thenReturn("localhost");
when(socket.getInetAddress()).thenReturn(inetAddress);
+
+ InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);
+
+ when(cachedRegionHelper.getCache()).thenReturn(cache);
+ when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
+ when(internalDistributedSystem.getDM()).thenReturn(distributionManager);
+ when(distributionManager.getThreadMonitoring()).thenReturn(threadsMonitoring);
}
/**
@@ -89,7 +103,7 @@ public class ServerConnectionIntegrationTest {
when(acceptor.getConnectionListener()).thenReturn(mock(ConnectionListener.class));
TestServerConnection testServerConnection =
- new TestServerConnection(socket, cache, mock(CachedRegionHelper.class), stats, 0, 0, null,
+ new TestServerConnection(socket, cache, cachedRegionHelper, stats, 0, 0, null,
CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor, securityService);
assertThatCode(() -> testServerConnection.run()).doesNotThrowAnyException();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 510a37b..427d1a2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_ACCESSOR;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_ACCESSOR_PP;
import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTHENTICATOR;
+import static org.apache.geode.internal.monitoring.ThreadsMonitoring.Mode.ServerConnectionExecutor;
import java.io.EOFException;
import java.io.IOException;
@@ -65,6 +66,8 @@ import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.ServerSideHandshake;
import org.apache.geode.internal.cache.tier.sockets.command.Default;
import org.apache.geode.internal.logging.InternalLogWriter;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
import org.apache.geode.internal.security.AuthorizeRequest;
import org.apache.geode.internal.security.AuthorizeRequestPP;
import org.apache.geode.internal.security.SecurityService;
@@ -122,6 +125,13 @@ public abstract class ServerConnection implements Runnable {
private final ProcessingMessageTimer processingMessageTimer = new ProcessingMessageTimer();
+ private final ThreadsMonitoring threadMonitoring;
+ /**
+ * The threadMonitorExecutor for this server connection.
+ * This will be null if acceptor is a selector.
+ */
+ protected final AbstractExecutor threadMonitorExecutor;
+
public static ByteBuffer allocateCommBuffer(int size, Socket sock) {
// I expect that size will almost always be the same value
if (sock.getChannel() == null) {
@@ -304,6 +314,16 @@ public abstract class ServerConnection implements Runnable {
logger.debug("While creating server connection", e);
}
}
+ threadMonitoring = getCache().getInternalDistributedSystem().getDM().getThreadMonitoring();
+ if (getAcceptor().isSelector()) {
+ // When a selector is used, the thread pool that is used to process client requests
+ // will automatically register with the thread monitor the thread that is processing
+ // the request. So no need to create a threadMonitorExecutor.
+ threadMonitorExecutor = null;
+ } else {
+ threadMonitorExecutor = threadMonitoring.createAbstractExecutor(ServerConnectionExecutor);
+ suspendThreadMonitoring();
+ }
}
public Acceptor getAcceptor() {
@@ -778,6 +798,7 @@ public abstract class ServerConnection implements Runnable {
}
ThreadState threadState = null;
+ resumeThreadMonitoring();
try {
if (message != null) {
// Since this thread is not interrupted when the cache server is shutdown, test again after
@@ -847,6 +868,7 @@ public abstract class ServerConnection implements Runnable {
command.execute(message, this, securityService);
}
} finally {
+ suspendThreadMonitoring();
// Keep track of the fact that a message is no longer being
// processed.
serverConnectionCollection.connectionsProcessing.decrementAndGet();
@@ -858,6 +880,18 @@ public abstract class ServerConnection implements Runnable {
}
}
+ private void suspendThreadMonitoring() {
+ if (threadMonitorExecutor != null) {
+ threadMonitorExecutor.suspendMonitoring();
+ }
+ }
+
+ private void resumeThreadMonitoring() {
+ if (threadMonitorExecutor != null) {
+ threadMonitorExecutor.resumeMonitoring();
+ }
+ }
+
private final Object terminationLock = new Object();
private boolean terminated;
@@ -1212,6 +1246,7 @@ public abstract class ServerConnection implements Runnable {
}
}
} else {
+ threadMonitoring.register(threadMonitorExecutor);
try {
while (processMessages && !crHelper.isShutdown()) {
try {
@@ -1224,6 +1259,7 @@ public abstract class ServerConnection implements Runnable {
}
}
} finally {
+ threadMonitoring.unregister(threadMonitorExecutor);
try {
unsetRequestSpecificTimeout();
handleTermination();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java
index aa5ba6d..2210aed 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoring.java
@@ -28,7 +28,8 @@ public interface ThreadsMonitoring {
OneTaskOnlyExecutor,
ScheduledThreadExecutor,
AGSExecutor,
- P2PReaderExecutor
+ P2PReaderExecutor,
+ ServerConnectionExecutor
};
Map<Long, AbstractExecutor> getMonitorMap();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java
index 1c0e49c..a2946f9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImpl.java
@@ -30,6 +30,7 @@ import org.apache.geode.internal.monitoring.executor.P2PReaderExecutorGroup;
import org.apache.geode.internal.monitoring.executor.PooledExecutorGroup;
import org.apache.geode.internal.monitoring.executor.ScheduledThreadPoolExecutorWKAGroup;
import org.apache.geode.internal.monitoring.executor.SerialQueuedExecutorGroup;
+import org.apache.geode.internal.monitoring.executor.ServerConnectionExecutorGroup;
public class ThreadsMonitoringImpl implements ThreadsMonitoring {
@@ -134,6 +135,8 @@ public class ThreadsMonitoringImpl implements ThreadsMonitoring {
return new GatewaySenderEventProcessorGroup();
case P2PReaderExecutor:
return new P2PReaderExecutorGroup();
+ case ServerConnectionExecutor:
+ return new ServerConnectionExecutorGroup();
default:
throw new IllegalStateException("Unhandled mode=" + mode);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
index 6ee9909..3e56de5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
@@ -14,30 +14,10 @@
*/
package org.apache.geode.internal.monitoring.executor;
-public class P2PReaderExecutorGroup extends AbstractExecutor {
-
+public class P2PReaderExecutorGroup extends SuspendableExecutor {
public static final String GROUP_NAME = "P2PReaderExecutor";
- private volatile boolean suspended;
-
public P2PReaderExecutorGroup() {
super(GROUP_NAME);
}
-
- @Override
- public void suspendMonitoring() {
- suspended = true;
- }
-
- @Override
- public void resumeMonitoring() {
- setStartTime(0);
- suspended = false;
- }
-
- @Override
- public boolean isMonitoringSuspended() {
- return suspended;
- }
-
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/ServerConnectionExecutorGroup.java
similarity index 65%
copy from geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
copy to geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/ServerConnectionExecutorGroup.java
index 6ee9909..3dd4bb2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/ServerConnectionExecutorGroup.java
@@ -14,30 +14,10 @@
*/
package org.apache.geode.internal.monitoring.executor;
-public class P2PReaderExecutorGroup extends AbstractExecutor {
+public class ServerConnectionExecutorGroup extends SuspendableExecutor {
+ public static final String GROUP_NAME = "ServerConnectionExecutor";
- public static final String GROUP_NAME = "P2PReaderExecutor";
-
- private volatile boolean suspended;
-
- public P2PReaderExecutorGroup() {
+ public ServerConnectionExecutorGroup() {
super(GROUP_NAME);
}
-
- @Override
- public void suspendMonitoring() {
- suspended = true;
- }
-
- @Override
- public void resumeMonitoring() {
- setStartTime(0);
- suspended = false;
- }
-
- @Override
- public boolean isMonitoringSuspended() {
- return suspended;
- }
-
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
similarity index 85%
copy from geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
copy to geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
index 6ee9909..9e4849b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutor.java
@@ -14,14 +14,11 @@
*/
package org.apache.geode.internal.monitoring.executor;
-public class P2PReaderExecutorGroup extends AbstractExecutor {
-
- public static final String GROUP_NAME = "P2PReaderExecutor";
-
+public abstract class SuspendableExecutor extends AbstractExecutor {
private volatile boolean suspended;
- public P2PReaderExecutorGroup() {
- super(GROUP_NAME);
+ public SuspendableExecutor(String groupName) {
+ super(groupName);
}
@Override
@@ -39,5 +36,4 @@ public class P2PReaderExecutorGroup extends AbstractExecutor {
public boolean isMonitoringSuspended() {
return suspended;
}
-
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
index 1c67e01..46a17c7 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
@@ -28,6 +28,7 @@ import java.net.Socket;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import junitparams.naming.TestCaseName;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.RestoreSystemProperties;
@@ -36,10 +37,13 @@ import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
-import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.InternalCacheForClientAccess;
import org.apache.geode.internal.cache.client.protocol.exception.ServiceLoadingFailureException;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.test.junit.categories.ClientServerTest;
@@ -61,6 +65,26 @@ public class ServerConnectionFactoryTest {
@Rule
public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+ private InternalCacheForClientAccess cache;
+ private CachedRegionHelper cachedRegionHelper;
+
+ @Before
+ public void setUp() throws IOException {
+ cache = mock(InternalCacheForClientAccess.class);
+ cachedRegionHelper = mock(CachedRegionHelper.class);
+ }
+
+ private void setupThreadMonitoring() {
+ InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);
+
+ when(cachedRegionHelper.getCache()).thenReturn(cache);
+ when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
+ when(internalDistributedSystem.getDM()).thenReturn(distributionManager);
+ when(distributionManager.getThreadMonitoring()).thenReturn(threadsMonitoring);
+ }
+
/**
* Safeguard that we won't create the new client protocol object unless the feature flag is
* enabled.
@@ -69,7 +93,7 @@ public class ServerConnectionFactoryTest {
public void newClientProtocolFailsWithoutSystemPropertySet() {
Throwable thrown = catchThrowable(
() -> new ServerConnectionFactory().makeServerConnection(mock(Socket.class),
- mock(InternalCache.class), mock(CachedRegionHelper.class), mock(CacheServerStats.class),
+ cache, cachedRegionHelper, mock(CacheServerStats.class),
0, 0, "", CommunicationMode.ProtobufClientServerProtocol.getModeNumber(),
mock(AcceptorImpl.class), mock(SecurityService.class)));
@@ -81,7 +105,7 @@ public class ServerConnectionFactoryTest {
System.setProperty("geode.feature-protobuf-protocol", "true");
Throwable thrown = catchThrowable(
() -> new ServerConnectionFactory().makeServerConnection(mock(Socket.class),
- mock(InternalCache.class), mock(CachedRegionHelper.class), mock(CacheServerStats.class),
+ cache, cachedRegionHelper, mock(CacheServerStats.class),
0, 0, "", CommunicationMode.ProtobufClientServerProtocol.getModeNumber(),
mock(AcceptorImpl.class), mock(SecurityService.class)));
@@ -97,10 +121,11 @@ public class ServerConnectionFactoryTest {
Socket socket = mock(Socket.class);
when(socket.getInetAddress()).thenReturn(InetAddress.getByName("localhost"));
when(socket.getInputStream()).thenReturn(mock(InputStream.class));
+ setupThreadMonitoring();
ServerConnection serverConnection =
- new ServerConnectionFactory().makeServerConnection(socket, mock(InternalCache.class),
- mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, "",
+ new ServerConnectionFactory().makeServerConnection(socket,
+ cache, cachedRegionHelper, mock(CacheServerStats.class), 0, 0, "",
communicationMode.getModeNumber(),
mock(AcceptorImpl.class), mock(SecurityService.class));
@@ -118,10 +143,11 @@ public class ServerConnectionFactoryTest {
Socket socket = mock(Socket.class);
when(socket.getInetAddress()).thenReturn(InetAddress.getByName("localhost"));
when(socket.getInputStream()).thenReturn(mock(InputStream.class));
+ setupThreadMonitoring();
ServerConnection serverConnection =
- new ServerConnectionFactory().makeServerConnection(socket, mock(InternalCache.class),
- mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, "",
+ new ServerConnectionFactory().makeServerConnection(socket, cache,
+ cachedRegionHelper, mock(CacheServerStats.class), 0, 0, "",
communicationMode.getModeNumber(),
mock(AcceptorImpl.class), mock(SecurityService.class));
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
index d2b0671..a8de093 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
@@ -36,11 +36,14 @@ import org.junit.experimental.categories.Category;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
-import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.cache.InternalCacheForClientAccess;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.cache.tier.Encryptor;
import org.apache.geode.internal.cache.tier.ServerSideHandshake;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.security.AuthenticationRequiredException;
import org.apache.geode.test.junit.categories.ClientServerTest;
@@ -68,10 +71,20 @@ public class ServerConnectionTest {
when(inetAddress.getHostAddress()).thenReturn("localhost");
when(socket.getInetAddress()).thenReturn(inetAddress);
+ InternalCacheForClientAccess cache = mock(InternalCacheForClientAccess.class);
+ CachedRegionHelper cachedRegionHelper = mock(CachedRegionHelper.class);
+ InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);
+
+ when(cachedRegionHelper.getCache()).thenReturn(cache);
+ when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
+ when(internalDistributedSystem.getDM()).thenReturn(distributionManager);
+ when(distributionManager.getThreadMonitoring()).thenReturn(threadsMonitoring);
serverConnection =
- new ServerConnectionFactory().makeServerConnection(socket, mock(InternalCache.class),
- mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, null,
+ new ServerConnectionFactory().makeServerConnection(socket, cache,
+ cachedRegionHelper, mock(CacheServerStats.class), 0, 0, null,
CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor,
mock(SecurityService.class));
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java
index 7840fbb..78af92a 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringImplJUnitTest.java
@@ -62,6 +62,7 @@ public class ThreadsMonitoringImplJUnitTest {
assertTrue(threadsMonitoringImpl.startMonitor(Mode.ScheduledThreadExecutor));
assertTrue(threadsMonitoringImpl.startMonitor(Mode.AGSExecutor));
assertTrue(threadsMonitoringImpl.startMonitor(Mode.P2PReaderExecutor));
+ assertTrue(threadsMonitoringImpl.startMonitor(Mode.ServerConnectionExecutor));
}
@Test
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java
index 3827308..ff6f8ac 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/ThreadsMonitoringJUnitTest.java
@@ -35,11 +35,12 @@ public class ThreadsMonitoringJUnitTest {
OneTaskOnlyExecutor,
ScheduledThreadExecutor,
AGSExecutor,
- P2PReaderExecutor
+ P2PReaderExecutor,
+ ServerConnectionExecutor
};
- public final int numberOfElements = 7;
+ public final int numberOfElements = 8;
private static final Logger logger = LogService.getLogger();
/**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroupTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroupTest.java
index c266c4c..5a4aac2 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroupTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroupTest.java
@@ -25,29 +25,4 @@ public class P2PReaderExecutorGroupTest {
assertThat(new P2PReaderExecutorGroup().getGroupName())
.isEqualTo(P2PReaderExecutorGroup.GROUP_NAME);
}
-
- @Test
- public void verifySuspendLifecycle() {
- P2PReaderExecutorGroup executor = new P2PReaderExecutorGroup();
- assertThat(executor.isMonitoringSuspended()).isFalse();
- executor.suspendMonitoring();
- assertThat(executor.isMonitoringSuspended()).isTrue();
- executor.suspendMonitoring();
- assertThat(executor.isMonitoringSuspended()).isTrue();
- executor.resumeMonitoring();
- assertThat(executor.isMonitoringSuspended()).isFalse();
- executor.resumeMonitoring();
- assertThat(executor.isMonitoringSuspended()).isFalse();
- executor.suspendMonitoring();
- assertThat(executor.isMonitoringSuspended()).isTrue();
- }
-
- @Test
- public void verifyResumeClearsStartTime() {
- P2PReaderExecutorGroup executor = new P2PReaderExecutorGroup();
- executor.setStartTime(1);
- assertThat(executor.getStartTime()).isEqualTo(1);
- executor.resumeMonitoring();
- assertThat(executor.getStartTime()).isEqualTo(0);
- }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/ServerConnectionExecutorGroupTest.java
similarity index 64%
copy from geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
copy to geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/ServerConnectionExecutorGroupTest.java
index 6ee9909..5dc99f0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroup.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/ServerConnectionExecutorGroupTest.java
@@ -14,30 +14,15 @@
*/
package org.apache.geode.internal.monitoring.executor;
-public class P2PReaderExecutorGroup extends AbstractExecutor {
+import static org.assertj.core.api.Assertions.assertThat;
- public static final String GROUP_NAME = "P2PReaderExecutor";
+import org.junit.Test;
- private volatile boolean suspended;
+public class ServerConnectionExecutorGroupTest {
- public P2PReaderExecutorGroup() {
- super(GROUP_NAME);
+ @Test
+ public void testVerifyGroupName() {
+ assertThat(new ServerConnectionExecutorGroup().getGroupName())
+ .isEqualTo(ServerConnectionExecutorGroup.GROUP_NAME);
}
-
- @Override
- public void suspendMonitoring() {
- suspended = true;
- }
-
- @Override
- public void resumeMonitoring() {
- setStartTime(0);
- suspended = false;
- }
-
- @Override
- public boolean isMonitoringSuspended() {
- return suspended;
- }
-
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroupTest.java b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java
similarity index 83%
copy from geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroupTest.java
copy to geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java
index c266c4c..5cc9afb 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/P2PReaderExecutorGroupTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/monitoring/executor/SuspendableExecutorTest.java
@@ -18,17 +18,17 @@ import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Test;
-public class P2PReaderExecutorGroupTest {
+public class SuspendableExecutorTest {
- @Test
- public void testVerifyGroupName() {
- assertThat(new P2PReaderExecutorGroup().getGroupName())
- .isEqualTo(P2PReaderExecutorGroup.GROUP_NAME);
+ public static class FakeSuspendableExecutor extends SuspendableExecutor {
+ public FakeSuspendableExecutor() {
+ super("FakeSuspendableExecutor");
+ }
}
@Test
public void verifySuspendLifecycle() {
- P2PReaderExecutorGroup executor = new P2PReaderExecutorGroup();
+ SuspendableExecutor executor = new FakeSuspendableExecutor();
assertThat(executor.isMonitoringSuspended()).isFalse();
executor.suspendMonitoring();
assertThat(executor.isMonitoringSuspended()).isTrue();
@@ -44,7 +44,7 @@ public class P2PReaderExecutorGroupTest {
@Test
public void verifyResumeClearsStartTime() {
- P2PReaderExecutorGroup executor = new P2PReaderExecutorGroup();
+ SuspendableExecutor executor = new FakeSuspendableExecutor();
executor.setStartTime(1);
assertThat(executor.getStartTime()).isEqualTo(1);
executor.resumeMonitoring();
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java
index a3996f8..779a351 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java
@@ -35,10 +35,13 @@ import org.junit.experimental.categories.Category;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.InternalCacheForClientAccess;
import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.test.junit.categories.ClientServerTest;
@@ -64,9 +67,15 @@ public class OutputCapturingServerConnectionTest {
socket = mock(Socket.class);
cache = mock(InternalCacheForClientAccess.class);
cachedRegionHelper = mock(CachedRegionHelper.class);
+ InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);
when(acceptor.getClientHealthMonitor()).thenReturn(mock(ClientHealthMonitor.class));
when(cachedRegionHelper.getCache()).thenReturn(cache);
+ when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
+ when(internalDistributedSystem.getDM()).thenReturn(distributionManager);
+ when(distributionManager.getThreadMonitoring()).thenReturn(threadsMonitoring);
when(socket.getInetAddress()).thenReturn(mock(InetAddress.class));
when(socket.getOutputStream()).thenReturn(new ByteArrayOutputStream());
when(socket.getRemoteSocketAddress()).thenReturn(createUnresolved("localhost", 9071));
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java
index b408d4f..5b74cc3 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java
@@ -38,11 +38,14 @@ import org.mockito.ArgumentCaptor;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.InternalCacheForClientAccess;
import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
import org.apache.geode.internal.cache.tier.Acceptor;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.test.junit.categories.ClientServerTest;
@@ -65,18 +68,24 @@ public class ProtobufServerConnectionTest {
cachedRegionHelper = mock(CachedRegionHelper.class);
clientHealthMonitor = mock(ClientHealthMonitor.class);
socket = mock(Socket.class);
+ InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+ DistributionManager distributionManager = mock(DistributionManager.class);
+ ThreadsMonitoring threadsMonitoring = mock(ThreadsMonitoring.class);
when(acceptor.getClientHealthMonitor()).thenReturn(clientHealthMonitor);
when(socket.getInetAddress()).thenReturn(mock(InetAddress.class));
when(socket.getOutputStream()).thenReturn(new ByteArrayOutputStream());
when(socket.getRemoteSocketAddress()).thenReturn(createUnresolved("localhost", 9071));
+ when(cachedRegionHelper.getCache()).thenReturn(cache);
+ when(cache.getInternalDistributedSystem()).thenReturn(internalDistributedSystem);
+ when(internalDistributedSystem.getDM()).thenReturn(distributionManager);
+ when(distributionManager.getThreadMonitoring()).thenReturn(threadsMonitoring);
}
@Test
public void doOneMessageUnsetsProcessMessagesFlag() throws Exception {
ClientProtocolProcessor clientProtocolProcessor = mock(ClientProtocolProcessor.class);
doThrow(new IOException("throw me")).when(clientProtocolProcessor).processMessage(any(), any());
- when(cachedRegionHelper.getCache()).thenReturn(cache);
ServerConnection serverConnection = new ProtobufServerConnection(socket, cache,
cachedRegionHelper, mock(CacheServerStats.class), 0, 1024, "",
CommunicationMode.ProtobufClientServerProtocol.getModeNumber(), acceptor,
@@ -133,7 +142,6 @@ public class ProtobufServerConnectionTest {
@Test
public void doOneMessageNotifiesClientHealthMonitorOfPing() throws IOException {
- when(cachedRegionHelper.getCache()).thenReturn(cache);
ServerConnection serverConnection = new ProtobufServerConnection(socket, cache,
cachedRegionHelper, mock(CacheServerStats.class), 0, 1024, "",
CommunicationMode.ProtobufClientServerProtocol.getModeNumber(), acceptor,