You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2018/08/16 03:34:47 UTC

[geode] branch develop updated: GEODE-5248: Fixes in GatewayReceiverMBeanBridge

This is an automated email from the ASF dual-hosted git repository.

jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5e73f06  GEODE-5248: Fixes in GatewayReceiverMBeanBridge
5e73f06 is described below

commit 5e73f062a1fca05bb9002f7a7ef9ad8b9265e49f
Author: Juan Jose Ramos Cassella <ju...@gmail.com>
AuthorDate: Fri Jul 27 13:06:10 2018 +0100

    GEODE-5248: Fixes in GatewayReceiverMBeanBridge
    
    The class `GatewayReceiverMBeanBridge` uses the `MBeanStatsMonitor` to
    retrieve the amount of senders currently connected. This statistic
    tends to be out of date and heavily depends on the `HostStatSampler`
    class to run periodically and update the data. Whenever a receiver is
    stopped its stats are also closed and, as such, the
    `clientConnectionCount` is not updated anymore, resulting in a wrong
    gateway-sender connected count reported.
    
    - Fixed some minor warnings.
    - Added tests to verify the accuracy of the current gateway-senders
      connected.
    - Renamed `WanCommandListDUnitTest` to `ListGatewaysCommandDUnitTest`.
    - Modified the `GatewayReceiverMBeanBridge` to retrieve the current
      client connection count from the underlying acceptor instead of
      the `MBeanStatsMonitor` class.
---
 .../bean/stats/GatewayReceiverStatsJUnitTest.java  |  5 --
 .../internal/beans/GatewayReceiverMBeanBridge.java | 23 +++---
 ...Test.java => ListGatewaysCommandDUnitTest.java} | 95 +++++++++++++++++++---
 .../StatusGatewayReceiverCommandDUnitTest.java     | 39 +++++----
 .../cache/wan/wancommand/WANCommandUtils.java      |  2 +-
 5 files changed, 114 insertions(+), 50 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/bean/stats/GatewayReceiverStatsJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/bean/stats/GatewayReceiverStatsJUnitTest.java
index a9c568f..f87a054 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/management/bean/stats/GatewayReceiverStatsJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/management/bean/stats/GatewayReceiverStatsJUnitTest.java
@@ -65,7 +65,6 @@ public class GatewayReceiverStatsJUnitTest extends MBeanStatsTestCase {
     assertEquals(1, getCurrentClients());
     assertEquals(1, getConnectionThreads());
     assertEquals(1, getThreadQueueSize());
-    assertEquals(1, getClientConnectionCount());
     assertEquals(1, getTotalFailedConnectionAttempts());
     assertEquals(1, getTotalConnectionsTimedOut());
     assertEquals(20, getTotalSentBytes());
@@ -191,10 +190,6 @@ public class GatewayReceiverStatsJUnitTest extends MBeanStatsTestCase {
     return bridge.getTotalReceivedBytes();
   }
 
-  private int getClientConnectionCount() {
-    return bridge.getClientConnectionCount();
-  }
-
   private int getCurrentClients() {
     return bridge.getCurrentClients();
   }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java
index 5f90978..195f138 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java
@@ -48,7 +48,7 @@ public class GatewayReceiverMBeanBridge extends ServerBridge {
     initializeReceiverStats();
   }
 
-  protected void destroyServer() {
+  void destroyServer() {
     removeServer();
   }
 
@@ -84,12 +84,6 @@ public class GatewayReceiverMBeanBridge extends ServerBridge {
     return rcv.getPort();
   }
 
-
-  public String getReceiverId() {
-    return null;
-  }
-
-
   public int getSocketBufferSize() {
     return rcv.getSocketBufferSize();
   }
@@ -185,13 +179,20 @@ public class GatewayReceiverMBeanBridge extends ServerBridge {
     return eventsReceivedRate.getRate();
   }
 
+  @Override
+  public int getClientConnectionCount() {
+    // See GEODE-5248: we can't rely on ServerBridge as the HostStatSampler might not have ran
+    // between the last statistical update and the time at which this method is called.
+    return (!isRunning()) ? 0
+        : ((CacheServerImpl) rcv.getServer()).getAcceptor().getClientServerCnxCount();
+  }
 
-  public String[] getConnectedGatewaySenders() {
-    Set<String> uniqueIds = null;
+  String[] getConnectedGatewaySenders() {
+    Set<String> uniqueIds;
     AcceptorImpl acceptor = ((CacheServerImpl) rcv.getServer()).getAcceptor();
     Set<ServerConnection> serverConnections = acceptor.getAllServerConnections();
     if (serverConnections != null && serverConnections.size() > 0) {
-      uniqueIds = new HashSet<String>();
+      uniqueIds = new HashSet<>();
       for (ServerConnection conn : serverConnections) {
         uniqueIds.add(conn.getMembershipID());
       }
@@ -201,7 +202,7 @@ public class GatewayReceiverMBeanBridge extends ServerBridge {
     return new String[0];
   }
 
-  public long getAverageBatchProcessingTime() {
+  long getAverageBatchProcessingTime() {
     if (getStatistic(StatsKey.TOTAL_BATCHES).longValue() != 0) {
       long processTimeInNano = getStatistic(StatsKey.BATCH_PROCESS_TIME).longValue()
           / getStatistic(StatsKey.TOTAL_BATCHES).longValue();
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandListDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/ListGatewaysCommandDUnitTest.java
similarity index 81%
rename from geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandListDUnitTest.java
rename to geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/ListGatewaysCommandDUnitTest.java
index fce5e51..ce6491a 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/WanCommandListDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/ListGatewaysCommandDUnitTest.java
@@ -23,6 +23,8 @@ import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.get
 import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
 import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
 import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateMemberMXBeanProxy;
+import static org.apache.geode.management.MXBeanAwaitility.await;
+import static org.apache.geode.management.MXBeanAwaitility.awaitGatewayReceiverMXBeanProxy;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.Serializable;
@@ -34,6 +36,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.management.GatewayReceiverMXBean;
 import org.apache.geode.management.cli.Result;
 import org.apache.geode.management.internal.cli.i18n.CliStrings;
 import org.apache.geode.management.internal.cli.result.CommandResult;
@@ -46,7 +49,7 @@ import org.apache.geode.test.junit.rules.GfshCommandRule;
 
 @Category({WanTest.class})
 @SuppressWarnings("serial")
-public class WanCommandListDUnitTest implements Serializable {
+public class ListGatewaysCommandDUnitTest implements Serializable {
 
   @Rule
   public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(8);
@@ -77,7 +80,7 @@ public class WanCommandListDUnitTest implements Serializable {
   }
 
   @Test
-  public void testListGatewayWithNoSenderReceiver() throws Exception {
+  public void testListGatewayWithNoSenderReceiver() {
     Integer lnPort = locatorSite1.getPort();
 
     // setup servers in Site #1 (London)
@@ -96,9 +99,9 @@ public class WanCommandListDUnitTest implements Serializable {
   }
 
   @Test
-  public void testListGatewaySender() throws Exception {
-    Integer lnPort = locatorSite1.getPort();
-    Integer nyPort = locatorSite2.getPort();
+  public void testListGatewaySender() {
+    int lnPort = locatorSite1.getPort();
+    int nyPort = locatorSite2.getPort();
 
     // setup servers in Site #1 (London)
     server1 = clusterStartupRule.startServerVM(3, lnPort);
@@ -157,9 +160,9 @@ public class WanCommandListDUnitTest implements Serializable {
   }
 
   @Test
-  public void testListGatewayReceiver() throws Exception {
-    Integer lnPort = locatorSite1.getPort();
-    Integer nyPort = locatorSite2.getPort();
+  public void testListGatewayReceiver() {
+    int lnPort = locatorSite1.getPort();
+    int nyPort = locatorSite2.getPort();
 
     // setup servers in Site #1 (London)
     server1 = clusterStartupRule.startServerVM(3, lnPort);
@@ -206,7 +209,7 @@ public class WanCommandListDUnitTest implements Serializable {
   }
 
   @Test
-  public void testListGatewaySenderGatewayReceiver() throws Exception {
+  public void testListGatewaySenderGatewayReceiver() {
     Integer lnPort = locatorSite1.getPort();
     Integer nyPort = locatorSite2.getPort();
 
@@ -277,9 +280,9 @@ public class WanCommandListDUnitTest implements Serializable {
   }
 
   @Test
-  public void testListGatewaySenderGatewayReceiver_group() throws Exception {
-    Integer lnPort = locatorSite1.getPort();
-    Integer nyPort = locatorSite2.getPort();
+  public void testListGatewaySenderGatewayReceiver_group() {
+    int lnPort = locatorSite1.getPort();
+    int nyPort = locatorSite2.getPort();
 
     // setup servers in Site #1 (London)
     server1 = startServerWithGroups(3, "Serial_Sender, Parallel_Sender", lnPort);
@@ -407,9 +410,75 @@ public class WanCommandListDUnitTest implements Serializable {
     assertThat(ports).hasSize(1);
   }
 
-  private MemberVM startServerWithGroups(int index, String groups, int locPort) throws Exception {
+  private MemberVM startServerWithGroups(int index, String groups, int locPort) {
     Properties props = new Properties();
     props.setProperty(GROUPS, groups);
     return clusterStartupRule.startServerVM(index, props, locPort);
   }
+
+  @Test
+  public void listGatewaysShouldCorrectlyUpdateSendersConnectedCountWhenReceiverStops()
+      throws Exception {
+    int site1Port = locatorSite1.getPort();
+    int site2Port = locatorSite2.getPort();
+
+    // Setup servers in Site #1
+    server1 = clusterStartupRule.startServerVM(3, site1Port);
+    server1.invoke(() -> createAndStartReceiver(site1Port));
+
+    // Setup servers in Site #2
+    server2 = clusterStartupRule.startServerVM(4, site2Port);
+    server2.invoke(() -> createSender("ln_Serial", 1, false, 100, 400, false, false, null, false));
+
+    // Check Gateways
+    locatorSite2.invoke(() -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()),
+        "ln_Serial", true, false));
+    locatorSite1.invoke(() -> {
+      GatewayReceiverMXBean gatewayReceiverMXBean =
+          awaitGatewayReceiverMXBeanProxy(getMember(server1.getVM()));
+      assertThat(gatewayReceiverMXBean).isNotNull();
+      await("Awaiting GatewayReceiverMXBean.isRunning(true)")
+          .until(() -> assertThat(gatewayReceiverMXBean.isRunning()).isTrue());
+      await("Awaiting GatewayReceiverMXBean.getClientConnectionCount() > 0")
+          .until(() -> assertThat(gatewayReceiverMXBean.getClientConnectionCount()).isPositive());
+    });
+
+    // Verify Results
+    gfsh.connect(locatorSite1);
+    CommandResult listGatewaysCommandResult = gfsh.executeCommand(CliStrings.LIST_GATEWAY);
+    assertThat(listGatewaysCommandResult).isNotNull();
+    assertThat(listGatewaysCommandResult.getStatus()).isSameAs(Result.Status.OK);
+    TabularResultData gatewayReceiversResultData =
+        ((CompositeResultData) listGatewaysCommandResult.getResultData())
+            .retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER)
+            .retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER);
+    assertThat(gatewayReceiversResultData.retrieveAllValues(CliStrings.RESULT_PORT)).hasSize(1);
+    assertThat(gatewayReceiversResultData.retrieveAllValues(CliStrings.RESULT_HOST_MEMBER))
+        .hasSize(1);
+    List<String> sendersCount =
+        gatewayReceiversResultData.retrieveAllValues(CliStrings.RESULT_SENDERS_COUNT);
+    assertThat(sendersCount).hasSize(1).doesNotContain("0");
+    assertThat(((CompositeResultData) listGatewaysCommandResult.getResultData())
+        .retrieveSection(CliStrings.SECTION_GATEWAY_SENDER)).isNull();
+
+    // Stop receivers in Site #1 and Verify Sender Count
+    server1.invoke(WANCommandUtils::stopReceivers);
+
+    locatorSite1
+        .invoke(() -> validateGatewayReceiverMXBeanProxy(getMember(server1.getVM()), false));
+    gfsh.connect(locatorSite1);
+    listGatewaysCommandResult = gfsh.executeCommand(CliStrings.LIST_GATEWAY);
+    assertThat(listGatewaysCommandResult).isNotNull();
+    assertThat(listGatewaysCommandResult.getStatus()).isSameAs(Result.Status.OK);
+    gatewayReceiversResultData = ((CompositeResultData) listGatewaysCommandResult.getResultData())
+        .retrieveSection(CliStrings.SECTION_GATEWAY_RECEIVER)
+        .retrieveTable(CliStrings.TABLE_GATEWAY_RECEIVER);
+    assertThat(gatewayReceiversResultData.retrieveAllValues(CliStrings.RESULT_PORT)).hasSize(1);
+    assertThat(gatewayReceiversResultData.retrieveAllValues(CliStrings.RESULT_HOST_MEMBER))
+        .hasSize(1);
+    sendersCount = gatewayReceiversResultData.retrieveAllValues(CliStrings.RESULT_SENDERS_COUNT);
+    assertThat(sendersCount).hasSize(1).containsExactly("0");
+    assertThat(((CompositeResultData) listGatewaysCommandResult.getResultData())
+        .retrieveSection(CliStrings.SECTION_GATEWAY_SENDER)).isNull();
+  }
 }
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StatusGatewayReceiverCommandDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StatusGatewayReceiverCommandDUnitTest.java
index a9eef0b..ffd000e 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StatusGatewayReceiverCommandDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StatusGatewayReceiverCommandDUnitTest.java
@@ -19,7 +19,6 @@ import static org.apache.geode.distributed.ConfigurationProperties.GROUPS;
 import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
 import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.createAndStartReceiver;
 import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
-import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.stopReceiver;
 import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -76,9 +75,9 @@ public class StatusGatewayReceiverCommandDUnitTest implements Serializable {
   }
 
   @Test
-  public void testGatewayReceiverStatus() throws Exception {
-    Integer lnPort = locatorSite1.getPort();
-    Integer nyPort = locatorSite2.getPort();
+  public void testGatewayReceiverStatus() {
+    int lnPort = locatorSite1.getPort();
+    int nyPort = locatorSite2.getPort();
 
     // setup servers in Site #1 (London)
     server1 = clusterStartupRule.startServerVM(3, lnPort);
@@ -112,9 +111,9 @@ public class StatusGatewayReceiverCommandDUnitTest implements Serializable {
     assertThat(result_Status).hasSize(3);
     assertThat(result_Status).doesNotContain(CliStrings.GATEWAY_NOT_RUNNING);
 
-    server1.invoke(() -> stopReceiver());
-    server2.invoke(() -> stopReceiver());
-    server3.invoke(() -> stopReceiver());
+    server1.invoke(WANCommandUtils::stopReceivers);
+    server2.invoke(WANCommandUtils::stopReceivers);
+    server3.invoke(WANCommandUtils::stopReceivers);
 
     locatorSite1
         .invoke(() -> validateGatewayReceiverMXBeanProxy(getMember(server1.getVM()), false));
@@ -137,9 +136,9 @@ public class StatusGatewayReceiverCommandDUnitTest implements Serializable {
   }
 
   @Test
-  public void testGatewayReceiverStatus_OnMember() throws Exception {
-    Integer lnPort = locatorSite1.getPort();
-    Integer nyPort = locatorSite2.getPort();
+  public void testGatewayReceiverStatus_OnMember() {
+    int lnPort = locatorSite1.getPort();
+    int nyPort = locatorSite2.getPort();
 
     // setup servers in Site #1 (London)
     server1 = clusterStartupRule.startServerVM(3, lnPort);
@@ -175,9 +174,9 @@ public class StatusGatewayReceiverCommandDUnitTest implements Serializable {
     assertThat(result_Status).hasSize(1);
     assertThat(result_Status).doesNotContain(CliStrings.GATEWAY_NOT_RUNNING);
 
-    server1.invoke(() -> stopReceiver());
-    server2.invoke(() -> stopReceiver());
-    server3.invoke(() -> stopReceiver());
+    server1.invoke(WANCommandUtils::stopReceivers);
+    server2.invoke(WANCommandUtils::stopReceivers);
+    server3.invoke(WANCommandUtils::stopReceivers);
 
     locatorSite1
         .invoke(() -> validateGatewayReceiverMXBeanProxy(getMember(server1.getVM()), false));
@@ -200,9 +199,9 @@ public class StatusGatewayReceiverCommandDUnitTest implements Serializable {
   }
 
   @Test
-  public void testGatewayReceiverStatus_OnGroups() throws Exception {
-    Integer lnPort = locatorSite1.getPort();
-    Integer nyPort = locatorSite2.getPort();
+  public void testGatewayReceiverStatus_OnGroups() {
+    int lnPort = locatorSite1.getPort();
+    int nyPort = locatorSite2.getPort();
 
     // setup servers in Site #1 (London)
     server1 = startServerWithGroups(3, "RG1, RG2", lnPort);
@@ -239,9 +238,9 @@ public class StatusGatewayReceiverCommandDUnitTest implements Serializable {
     assertThat(result_Status).hasSize(3);
     assertThat(result_Status).doesNotContain(CliStrings.GATEWAY_NOT_RUNNING);
 
-    server1.invoke(() -> stopReceiver());
-    server2.invoke(() -> stopReceiver());
-    server3.invoke(() -> stopReceiver());
+    server1.invoke(WANCommandUtils::stopReceivers);
+    server2.invoke(WANCommandUtils::stopReceivers);
+    server3.invoke(WANCommandUtils::stopReceivers);
 
     locatorSite1
         .invoke(() -> validateGatewayReceiverMXBeanProxy(getMember(server1.getVM()), false));
@@ -263,7 +262,7 @@ public class StatusGatewayReceiverCommandDUnitTest implements Serializable {
     assertThat(result_Status).doesNotContain(CliStrings.GATEWAY_RUNNING);
   }
 
-  private MemberVM startServerWithGroups(int index, String groups, int locPort) throws Exception {
+  private MemberVM startServerWithGroups(int index, String groups, int locPort) {
     Properties props = new Properties();
     props.setProperty(GROUPS, groups);
     return clusterStartupRule.startServerVM(index, props, locPort);
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/WANCommandUtils.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/WANCommandUtils.java
index 1c50616..316a784 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/WANCommandUtils.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/WANCommandUtils.java
@@ -243,7 +243,7 @@ public class WANCommandUtils implements Serializable {
     }
   }
 
-  public static void stopReceiver() {
+  public static void stopReceivers() {
     Cache cache = ClusterStartupRule.getCache();
     Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
     for (GatewayReceiver receiver : receivers) {