You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2017/11/21 20:50:30 UTC

[geode] branch develop updated: GEODE-3788: add utility methods to get the async event queues in the … (#1083)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 10dc0a2  GEODE-3788: add utility methods to get the async event queues in the … (#1083)
10dc0a2 is described below

commit 10dc0a212eb9d9813e13fd02e8436ac9d8b0ae37
Author: jinmeiliao <ji...@pivotal.io>
AuthorDate: Tue Nov 21 12:50:28 2017 -0800

    GEODE-3788: add utility methods to get the async event queues in the … (#1083)
    
    * GEODE-3788: add utility methods to get the async event queues in the running cluster
---
 .../apache/geode/management/ManagementService.java |  5 ++++
 .../geode/management/internal/MBeanJMXAdapter.java |  5 ----
 .../internal/SystemManagementService.java          |  8 ++++++
 .../geode/management/internal/cli/CliUtil.java     | 15 +++++++++++
 .../internal/cli/commands/GfshCommand.java         |  4 +++
 .../management/internal/cli/CliUtilDUnitTest.java  | 30 ++++++++++++++++++++++
 .../apache/geode/test/dunit/rules/MemberVM.java    |  8 ++++++
 .../geode/test/junit/rules/MemberStarterRule.java  | 10 ++++++--
 8 files changed, 78 insertions(+), 7 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/management/ManagementService.java b/geode-core/src/main/java/org/apache/geode/management/ManagementService.java
index 887b04a..eb2e808 100755
--- a/geode-core/src/main/java/org/apache/geode/management/ManagementService.java
+++ b/geode-core/src/main/java/org/apache/geode/management/ManagementService.java
@@ -217,6 +217,11 @@ public abstract class ManagementService {
   public abstract Set<ObjectName> queryMBeanNames(DistributedMember member);
 
   /**
+   * Returns the ids of the async event queues on this member
+   */
+  public abstract Set<ObjectName> getAsyncEventQueueMBeanNames(DistributedMember member);
+
+  /**
    * Returns an instance of an MBean. This is a reference to the MBean instance and not a
    * {@link ObjectInstance}.
    *
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java b/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java
index 467e3d1..f3c0fe3 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java
@@ -483,11 +483,6 @@ public class MBeanJMXAdapter implements ManagementConstants {
         getMemberNameOrId(member))));
   }
 
-  public static ObjectName getAsyncEventQueueMBeanName(String member, String queueId) {
-    return getObjectName((MessageFormat.format(OBJECTNAME__ASYNCEVENTQUEUE_MXBEAN, queueId,
-        makeCompliantName(member))));
-  }
-
   public static ObjectName getDistributedRegionMbeanName(String regionPath) {
     return getObjectName((MessageFormat.format(OBJECTNAME__DISTRIBUTEDREGION_MXBEAN,
         makeCompliantRegionPath(regionPath))));
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java b/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
index 60615db..8f2c9c7 100755
--- a/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
@@ -18,6 +18,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.stream.Collectors;
 
 import javax.management.Notification;
 import javax.management.ObjectName;
@@ -358,6 +359,13 @@ public class SystemManagementService extends BaseManagementService {
   }
 
   @Override
+  public Set<ObjectName> getAsyncEventQueueMBeanNames(DistributedMember member) {
+    Set<ObjectName> mBeanNames = this.queryMBeanNames(member);
+    return mBeanNames.stream().filter(x -> "AsyncEventQueue".equals(x.getKeyProperty("service")))
+        .collect(Collectors.toSet());
+  }
+
+  @Override
   public ObjectName registerMBean(Object object, ObjectName objectName) {
     verifyManagementService();
     return jmxAdapter.registerMBean(object, objectName, false);
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java
index 02eb8b8..8ad2282 100755
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java
@@ -60,6 +60,7 @@ import org.apache.geode.management.DistributedRegionMXBean;
 import org.apache.geode.management.ManagementService;
 import org.apache.geode.management.cli.Result;
 import org.apache.geode.management.internal.MBeanJMXAdapter;
+import org.apache.geode.management.internal.SystemManagementService;
 import org.apache.geode.management.internal.cli.exceptions.UserErrorException;
 import org.apache.geode.management.internal.cli.i18n.CliStrings;
 import org.apache.geode.management.internal.cli.result.ResultBuilder;
@@ -406,6 +407,20 @@ public class CliUtil {
     return result;
   }
 
+  public static Set<DistributedMember> getMembersWithAsyncEventQueue(InternalCache cache,
+      String queueId) {
+    Set<DistributedMember> members = findMembers(null, null);
+    return members.stream().filter(m -> getAsyncEventQueueIds(cache, m).contains(queueId))
+        .collect(Collectors.toSet());
+  }
+
+  public static Set<String> getAsyncEventQueueIds(InternalCache cache, DistributedMember member) {
+    SystemManagementService managementService =
+        (SystemManagementService) ManagementService.getExistingManagementService(cache);
+    return managementService.getAsyncEventQueueMBeanNames(member).stream()
+        .map(x -> x.getKeyProperty("queue")).collect(Collectors.toSet());
+  }
+
   static class CustomFileFilter implements FileFilter {
     private String extensionWithDot;
 
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java
index 134a153..6a30378 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java
@@ -183,4 +183,8 @@ public interface GfshCommand extends CommandMarker {
   default Set<DistributedMember> findAnyMembersForRegion(InternalCache cache, String regionPath) {
     return CliUtil.getRegionAssociatedMembers(regionPath, cache, false);
   }
+
+  default Set<DistributedMember> findMembersWithAsyncEventQueue(String queueId) {
+    return CliUtil.getMembersWithAsyncEventQueue(getCache(), queueId);
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/CliUtilDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/CliUtilDUnitTest.java
index f1050d8..d3b97b8 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/CliUtilDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/CliUtilDUnitTest.java
@@ -31,6 +31,7 @@ import org.junit.experimental.categories.Category;
 import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
 import org.apache.geode.management.internal.cli.exceptions.UserErrorException;
 import org.apache.geode.test.dunit.rules.LocatorServerStartupRule;
 import org.apache.geode.test.dunit.rules.MemberVM;
@@ -184,6 +185,35 @@ public class CliUtilDUnitTest {
   }
 
 
+  @Test
+  public void getMembersWithQueueId() throws Exception {
+    gfsh.executeAndAssertThat("create async-event-queue --id=queue1 --group=group1 --listener="
+        + MyAsyncEventListener.class.getName()).statusIsSuccess();
+    gfsh.executeAndAssertThat("create async-event-queue --id=queue2 --group=group2 --listener="
+        + MyAsyncEventListener.class.getName()).statusIsSuccess();
+    gfsh.executeAndAssertThat(
+        "create async-event-queue --id=queue --listener=" + MyAsyncEventListener.class.getName())
+        .statusIsSuccess();
+
+    locator.waitTillAsyncEventQueuesAreReadyOnServers("queue1", 2);
+    locator.waitTillAsyncEventQueuesAreReadyOnServers("queue2", 2);
+    locator.waitTillAsyncEventQueuesAreReadyOnServers("queue", 4);
+
+    locator.invoke(() -> {
+      members =
+          CliUtil.getMembersWithAsyncEventQueue(LocatorServerStartupRule.getCache(), "queue1");
+      assertThat(getNames(members)).containsExactlyInAnyOrder("member1", "member2");
+
+      members =
+          CliUtil.getMembersWithAsyncEventQueue(LocatorServerStartupRule.getCache(), "queue2");
+      assertThat(getNames(members)).containsExactlyInAnyOrder("member3", "member4");
+
+      members = CliUtil.getMembersWithAsyncEventQueue(LocatorServerStartupRule.getCache(), "queue");
+      assertThat(getNames(members)).containsExactlyInAnyOrder("member1", "member2", "member3",
+          "member4");
+    });
+  }
+
   private static Set<String> getNames(Set<DistributedMember> members) {
     return members.stream().map(DistributedMember::getName).collect(Collectors.toSet());
   }
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/MemberVM.java b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/MemberVM.java
index 0e9cfbc..4e04b8e 100644
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/MemberVM.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/MemberVM.java
@@ -137,4 +137,12 @@ public class MemberVM implements Member {
     vm.invoke(() -> LocatorServerStartupRule.memberStarter.waitTillDiskStoreIsReady(diskstoreName,
         serverCount));
   }
+
+  public void waitTillAsyncEventQueuesAreReadyOnServers(String queueId, int serverCount) {
+    vm.invoke(() -> {
+      LocatorServerStartupRule.memberStarter.waitTillAsyncEventQueuesAreReadyOnServers(queueId,
+          serverCount);
+    });
+  }
+
 }
diff --git a/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java b/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
index da0f588..e2dcc9c 100644
--- a/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
+++ b/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
@@ -43,6 +43,7 @@ import org.apache.geode.internal.net.SocketCreatorFactory;
 import org.apache.geode.management.DistributedRegionMXBean;
 import org.apache.geode.management.DistributedSystemMXBean;
 import org.apache.geode.management.ManagementService;
+import org.apache.geode.management.internal.cli.CliUtil;
 import org.apache.geode.security.SecurityManager;
 import org.apache.geode.test.junit.rules.serializable.SerializableExternalResource;
 
@@ -205,9 +206,9 @@ public abstract class MemberStarterRule<T> extends SerializableExternalResource
       if (properties.containsKey(NAME)) {
         name = properties.getProperty(NAME);
       } else {
-        if (this instanceof ServerStarterRule)
+        if (this instanceof ServerStarterRule) {
           name = "server";
-        else {
+        } else {
           name = "locator";
         }
       }
@@ -263,6 +264,11 @@ public abstract class MemberStarterRule<T> extends SerializableExternalResource
         .until(() -> getDiskStoreCount(diskstoreName) == serverCount);
   }
 
+  public void waitTillAsyncEventQueuesAreReadyOnServers(String queueId, int serverCount) {
+    await().atMost(2, TimeUnit.SECONDS).until(
+        () -> CliUtil.getMembersWithAsyncEventQueue(getCache(), queueId).size() == serverCount);
+  }
+
   abstract void stopMember();
 
   @Override

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].