You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2018/08/16 03:34:47 UTC
[geode] branch develop updated: GEODE-5248: Fixes in
GatewayReceiverMBeanBridge
This is an automated email from the ASF dual-hosted git repository.
jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 5e73f06 GEODE-5248: Fixes in GatewayReceiverMBeanBridge
5e73f06 is described below
commit 5e73f062a1fca05bb9002f7a7ef9ad8b9265e49f
Author: Juan Jose Ramos Cassella <ju...@gmail.com>
AuthorDate: Fri Jul 27 13:06:10 2018 +0100
GEODE-5248: Fixes in GatewayReceiverMBeanBridge
The class `GatewayReceiverMBeanBridge` uses the `MBeanStatsMonitor` to
retrieve the amount of senders currently connected. This statistic
tends to be out of date and heavily depends on the `HostStatSampler`
class to run periodically and update the data. Whenever a receiver is
stopped its stats are also closed and, as such, the
`clientConnectionCount` is not updated anymore, resulting in a wrong
gateway-sender connected count reported.
- Fixed some minor warnings.
- Added tests to verify the accuracy of the current gateway-senders
connected.
- Renamed `WanCommandListDUnitTest` to `ListGatewaysCommandDUnitTest`.
- Modified the `GatewayReceiverMBeanBridge` to retrieve the current
client connection count from the underlying acceptor instead of
the `MBeanStatsMonitor` class.
---
.../bean/stats/GatewayReceiverStatsJUnitTest.java | 5 --
.../internal/beans/GatewayReceiverMBeanBridge.java | 23 +++---
...Test.java => ListGatewaysCommandDUnitTest.java} | 95 +++++++++++++++++++---
.../StatusGatewayReceiverCommandDUnitTest.java | 39 +++++----
.../cache/wan/wancommand/WANCommandUtils.java | 2 +-
5 files changed, 114 insertions(+), 50 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/bean/stats/GatewayReceiverStatsJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/bean/stats/GatewayReceiverStatsJUnitTest.java
index a9c568f..f87a054 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/management/bean/stats/GatewayReceiverStatsJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/management/bean/stats/GatewayReceiverStatsJUnitTest.java
@@ -65,7 +65,6 @@ public class GatewayReceiverStatsJUnitTest extends MBeanStatsTestCase {
assertEquals(1, getCurrentClients());
assertEquals(1, getConnectionThreads());
assertEquals(1, getThreadQueueSize());
- assertEquals(1, getClientConnectionCount());
assertEquals(1, getTotalFailedConnectionAttempts());
assertEquals(1, getTotalConnectionsTimedOut());
assertEquals(20, getTotalSentBytes());
@@ -191,10 +190,6 @@ public class GatewayReceiverStatsJUnitTest extends MBeanStatsTestCase {
return bridge.getTotalReceivedBytes();
}
- private int getClientConnectionCount() {
- return bridge.getClientConnectionCount();
- }
-
private int getCurrentClients() {
return bridge.getCurrentClients();
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java
index 5f90978..195f138 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java
@@ -48,7 +48,7 @@ public class GatewayReceiverMBeanBridge extends ServerBridge {
initializeReceiverStats();
}
- protected void destroyServer() {
+ void destroyServer() {
removeServer();
}
@@ -84,12 +84,6 @@ public class GatewayReceiverMBeanBridge extends ServerBridge {
return rcv.getPort();
}
-
- public String getReceiverId() {
- return null;
- }
-
-
public int getSocketBufferSize() {
return rcv.getSocketBufferSize();
}
@@ -185,13 +179,20 @@ public class GatewayReceiverMBeanBridge extends ServerBridge {
return eventsReceivedRate.getRate();
}
+ @Override
+ public int getClientConnectionCount() {
+ // See GEODE-5248: we can't rely on ServerBridge as the HostStatSampler might not have ran
+ // between the last statistical update and the time at which this method is called.
+ return (!isRunning()) ? 0
+ : ((CacheServerImpl) rcv.getServer()).getAcceptor().getClientServerCnxCount();
+ }
- public String[] getConnectedGatewaySenders() {
- Set<String> uniqueIds = null;
+ String[] getConnectedGatewaySenders() {
+ Set<String> uniqueIds;
AcceptorImpl acceptor = ((CacheServerImpl) rcv.getServer()).getAcceptor();
Set<ServerConnection> serverConnections = acceptor.getAllServerConnections();
if (serverConnections != null && serverConnections.size() > 0) {
- uniqueIds = new HashSet<String>();
+ uniqueIds = new HashSet<>();
for (ServerConnection conn : serverConnections) {
uniqueIds.add(conn.getMembershipID());
}
@@ -201,7 +202,7 @@ public class GatewayReceiverMBeanBridge extends ServerBridge {
return new String[0];
}
- public long getAverageBatchProcessingTime() {
+ long getAverageBatchProcessingTime() {
if (getStatistic(StatsKey.TOTAL_BATCHES).longValue() != 0) {
long processTimeInNano = getStatistic(StatsKey.BATCH_PROCESS_TIME).longValue()
/ getStatistic(StatsKey.TOTAL_BATCHES).longValue();
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandListDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/ListGatewaysCommandDUnitTest.java
similarity index 81%
rename from geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandListDUnitTest.java
rename to geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/ListGatewaysCommandDUnitTest.java
index fce5e51..ce6491a 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandListDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/ListGatewaysCommandDUnitTest.java
@@ -23,6 +23,8 @@ import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.get
import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateMemberMXBeanProxy;
+import static org.apache.geode.management.MXBeanAwaitility.await;
+import static org.apache.geode.management.MXBeanAwaitility.awaitGatewayReceiverMXBeanProxy;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.Serializable;
@@ -34,6 +36,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.geode.management.GatewayReceiverMXBean;
import org.apache.geode.management.cli.Result;
import org.apache.geode.management.internal.cli.i18n.CliStrings;
import org.apache.geode.management.internal.cli.result.CommandResult;
@@ -46,7 +49,7 @@ import org.apache.geode.test.junit.rules.GfshCommandRule;
@Category({WanTest.class})
@SuppressWarnings("serial")
-public class WanCommandListDUnitTest implements Serializable {
+public class ListGatewaysCommandDUnitTest implements Serializable {
@Rule
public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(8);
@@ -77,7 +80,7 @@ public class WanCommandListDUnitTest implements Serializable {
}
@Test
- public void testListGatewayWithNoSenderReceiver() throws Exception {
+ public void testListGatewayWithNoSenderReceiver() {
Integer lnPort = locatorSite1.getPort();
// setup servers in Site #1 (London)
@@ -96,9 +99,9 @@ public class WanCommandListDUnitTest implements Serializable {
}
@Test
- public void testListGatewaySender() throws Exception {
- Integer lnPort = locatorSite1.getPort();
- Integer nyPort = locatorSite2.getPort();
+ public void testListGatewaySender() {
+ int lnPort = locatorSite1.getPort();
+ int nyPort = locatorSite2.getPort();
// setup servers in Site #1 (London)
server1 = clusterStartupRule.startServerVM(3, lnPort);
@@ -157,9 +160,9 @@ public class WanCommandListDUnitTest implements Serializable {
}
@Test
- public void testListGatewayReceiver() throws Exception {
- Integer lnPort = locatorSite1.getPort();
- Integer nyPort = locatorSite2.getPort();
+ public void testListGatewayReceiver() {
+ int lnPort = locatorSite1.getPort();
+ int nyPort = locatorSite2.getPort();
// setup servers in Site #1 (London)
server1 = clusterStartupRule.startServerVM(3, lnPort);
@@ -206,7 +209,7 @@ public class WanCommandListDUnitTest implements Serializable {
}
@Test
- public void testListGatewaySenderGatewayReceiver() throws Exception {
+ public void testListGatewaySenderGatewayReceiver() {
Integer lnPort = locatorSite1.getPort();
Integer nyPort = locatorSite2.getPort();
@@ -277,9 +280,9 @@ public class WanCommandListDUnitTest implements Serializable {
}
@Test
- public void testListGatewaySenderGatewayReceiver_group() throws Exception {
- Integer lnPort = locatorSite1.getPort();
- Integer nyPort = locatorSite2.getPort();
+ public void testListGatewaySenderGatewayReceiver_group() {
+ int lnPort = locatorSite1.getPort();
+ int nyPort = locatorSite2.getPort();
// setup servers in Site #1 (London)
server1 = startServerWithGroups(3, "Serial_Sender, Parallel_Sender", lnPort);
@@ -407,9 +410,75 @@ public class WanCommandListDUnitTest implements Serializable {
assertThat(ports).hasSize(1);
}
- private MemberVM startServerWithGroups(int index, String groups, int locPort) throws Exception {
+ private MemberVM startServerWithGroups(int index, String groups, int locPort) {
Properties props = new Properties();
props.setProperty(GROUPS, groups);
return clusterStartupRule.startServerVM(index, props, locPort);
}
+
+ @Test
+ public void listGatewaysShouldCorrectlyUpdateSendersConnectedCountWhenReceiverStops()
+ throws Exception {
+ int site1Port = locatorSite1.getPort();
+ int site2Port = locatorSite2.getPort();
+
+ // Setup servers in Site #1
+ server1 = clusterStartupRule.startServerVM(3, site1Port);
+ server1.invoke(() -> createAndStartReceiver(site1Port));
+
+ // Setup servers in Site #2
+ server2 = clusterStartupRule.startServerVM(4, site2Port);
+ server2.invoke(() -> createSender("ln_Serial", 1, false, 100, 400, false, false, null, false));
+
+ // Check Gateways
+ locatorSite2.invoke(() -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()),
+ "ln_Serial", true, false));
+ locatorSite1.invoke(() -> {
+ GatewayReceiverMXBean gatewayReceiverMXBean =
+ awaitGatewayReceiverMXBeanProxy(getMember(server1.getVM()));
+ assertThat(gatewayReceiverMXBean).isNotNull();
+ await("Awaiting GatewayReceiverMXBean.isRunning(true)")
+ .until(() -> assertThat(gatewayReceiverMXBean.isRunning()).isTrue());
+ await("Awaiting GatewayReceiverMXBean.getClientConnectionCount() > 0")
+ .until(() -> assertThat(gatewayReceiverMXBean.getClientConnectionCount()).isPositive());
+ });
+
+ // Verify Results
+ gfsh.connect(locatorSite1);
+ CommandResult listGatewaysCommandResult = gfsh.executeCommand(CliStrings.LIST_GATEWAY);
+ assertThat(listGatewaysCommandResult).isNotNull();
+ assertThat(listGatewaysCommandResult.getStatus()).isSameAs(Result.Status.OK);
+ TabularResultData gatewayReceiversResultData =
+ ((CompositeResultData) listGatewaysCommandResult.getResultData())
+ .retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER)
+ .retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER);
+ assertThat(gatewayReceiversResultData.retrieveAllValues(CliStrings.RESULT_PORT)).hasSize(1);
+ assertThat(gatewayReceiversResultData.retrieveAllValues(CliStrings.RESULT_HOST_MEMBER))
+ .hasSize(1);
+ List<String> sendersCount =
+ gatewayReceiversResultData.retrieveAllValues(CliStrings.RESULT_SENDERS_COUNT);
+ assertThat(sendersCount).hasSize(1).doesNotContain("0");
+ assertThat(((CompositeResultData) listGatewaysCommandResult.getResultData())
+ .retrieveSection(CliStrings.SECTION_GATEWAY_SENDER)).isNull();
+
+ // Stop receivers in Site #1 and Verify Sender Count
+ server1.invoke(WANCommandUtils::stopReceivers);
+
+ locatorSite1
+ .invoke(() -> validateGatewayReceiverMXBeanProxy(getMember(server1.getVM()), false));
+ gfsh.connect(locatorSite1);
+ listGatewaysCommandResult = gfsh.executeCommand(CliStrings.LIST_GATEWAY);
+ assertThat(listGatewaysCommandResult).isNotNull();
+ assertThat(listGatewaysCommandResult.getStatus()).isSameAs(Result.Status.OK);
+ gatewayReceiversResultData = ((CompositeResultData) listGatewaysCommandResult.getResultData())
+ .retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER)
+ .retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER);
+ assertThat(gatewayReceiversResultData.retrieveAllValues(CliStrings.RESULT_PORT)).hasSize(1);
+ assertThat(gatewayReceiversResultData.retrieveAllValues(CliStrings.RESULT_HOST_MEMBER))
+ .hasSize(1);
+ sendersCount = gatewayReceiversResultData.retrieveAllValues(CliStrings.RESULT_SENDERS_COUNT);
+ assertThat(sendersCount).hasSize(1).containsExactly("0");
+ assertThat(((CompositeResultData) listGatewaysCommandResult.getResultData())
+ .retrieveSection(CliStrings.SECTION_GATEWAY_SENDER)).isNull();
+ }
}
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StatusGatewayReceiverCommandDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StatusGatewayReceiverCommandDUnitTest.java
index a9eef0b..ffd000e 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StatusGatewayReceiverCommandDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StatusGatewayReceiverCommandDUnitTest.java
@@ -19,7 +19,6 @@ import static org.apache.geode.distributed.ConfigurationProperties.GROUPS;
import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.createAndStartReceiver;
import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
-import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.stopReceiver;
import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
import static org.assertj.core.api.Assertions.assertThat;
@@ -76,9 +75,9 @@ public class StatusGatewayReceiverCommandDUnitTest implements Serializable {
}
@Test
- public void testGatewayReceiverStatus() throws Exception {
- Integer lnPort = locatorSite1.getPort();
- Integer nyPort = locatorSite2.getPort();
+ public void testGatewayReceiverStatus() {
+ int lnPort = locatorSite1.getPort();
+ int nyPort = locatorSite2.getPort();
// setup servers in Site #1 (London)
server1 = clusterStartupRule.startServerVM(3, lnPort);
@@ -112,9 +111,9 @@ public class StatusGatewayReceiverCommandDUnitTest implements Serializable {
assertThat(result_Status).hasSize(3);
assertThat(result_Status).doesNotContain(CliStrings.GATEWAY_NOT_RUNNING);
- server1.invoke(() -> stopReceiver());
- server2.invoke(() -> stopReceiver());
- server3.invoke(() -> stopReceiver());
+ server1.invoke(WANCommandUtils::stopReceivers);
+ server2.invoke(WANCommandUtils::stopReceivers);
+ server3.invoke(WANCommandUtils::stopReceivers);
locatorSite1
.invoke(() -> validateGatewayReceiverMXBeanProxy(getMember(server1.getVM()), false));
@@ -137,9 +136,9 @@ public class StatusGatewayReceiverCommandDUnitTest implements Serializable {
}
@Test
- public void testGatewayReceiverStatus_OnMember() throws Exception {
- Integer lnPort = locatorSite1.getPort();
- Integer nyPort = locatorSite2.getPort();
+ public void testGatewayReceiverStatus_OnMember() {
+ int lnPort = locatorSite1.getPort();
+ int nyPort = locatorSite2.getPort();
// setup servers in Site #1 (London)
server1 = clusterStartupRule.startServerVM(3, lnPort);
@@ -175,9 +174,9 @@ public class StatusGatewayReceiverCommandDUnitTest implements Serializable {
assertThat(result_Status).hasSize(1);
assertThat(result_Status).doesNotContain(CliStrings.GATEWAY_NOT_RUNNING);
- server1.invoke(() -> stopReceiver());
- server2.invoke(() -> stopReceiver());
- server3.invoke(() -> stopReceiver());
+ server1.invoke(WANCommandUtils::stopReceivers);
+ server2.invoke(WANCommandUtils::stopReceivers);
+ server3.invoke(WANCommandUtils::stopReceivers);
locatorSite1
.invoke(() -> validateGatewayReceiverMXBeanProxy(getMember(server1.getVM()), false));
@@ -200,9 +199,9 @@ public class StatusGatewayReceiverCommandDUnitTest implements Serializable {
}
@Test
- public void testGatewayReceiverStatus_OnGroups() throws Exception {
- Integer lnPort = locatorSite1.getPort();
- Integer nyPort = locatorSite2.getPort();
+ public void testGatewayReceiverStatus_OnGroups() {
+ int lnPort = locatorSite1.getPort();
+ int nyPort = locatorSite2.getPort();
// setup servers in Site #1 (London)
server1 = startServerWithGroups(3, "RG1, RG2", lnPort);
@@ -239,9 +238,9 @@ public class StatusGatewayReceiverCommandDUnitTest implements Serializable {
assertThat(result_Status).hasSize(3);
assertThat(result_Status).doesNotContain(CliStrings.GATEWAY_NOT_RUNNING);
- server1.invoke(() -> stopReceiver());
- server2.invoke(() -> stopReceiver());
- server3.invoke(() -> stopReceiver());
+ server1.invoke(WANCommandUtils::stopReceivers);
+ server2.invoke(WANCommandUtils::stopReceivers);
+ server3.invoke(WANCommandUtils::stopReceivers);
locatorSite1
.invoke(() -> validateGatewayReceiverMXBeanProxy(getMember(server1.getVM()), false));
@@ -263,7 +262,7 @@ public class StatusGatewayReceiverCommandDUnitTest implements Serializable {
assertThat(result_Status).doesNotContain(CliStrings.GATEWAY_RUNNING);
}
- private MemberVM startServerWithGroups(int index, String groups, int locPort) throws Exception {
+ private MemberVM startServerWithGroups(int index, String groups, int locPort) {
Properties props = new Properties();
props.setProperty(GROUPS, groups);
return clusterStartupRule.startServerVM(index, props, locPort);
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/WANCommandUtils.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/WANCommandUtils.java
index 1c50616..316a784 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/WANCommandUtils.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/WANCommandUtils.java
@@ -243,7 +243,7 @@ public class WANCommandUtils implements Serializable {
}
}
- public static void stopReceiver() {
+ public static void stopReceivers() {
Cache cache = ClusterStartupRule.getCache();
Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
for (GatewayReceiver receiver : receivers) {