You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2017/11/21 20:50:30 UTC
[geode] branch develop updated: GEODE-3788: add utility methods to get the async event queues in the … (#1083)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 10dc0a2 GEODE-3788: add utility methods to get the async event queues in the … (#1083)
10dc0a2 is described below
commit 10dc0a212eb9d9813e13fd02e8436ac9d8b0ae37
Author: jinmeiliao <ji...@pivotal.io>
AuthorDate: Tue Nov 21 12:50:28 2017 -0800
GEODE-3788: add utility methods to get the async event queues in the … (#1083)
* GEODE-3788: add utility methods to get the async event queues in the running cluster
---
.../apache/geode/management/ManagementService.java | 5 ++++
.../geode/management/internal/MBeanJMXAdapter.java | 5 ----
.../internal/SystemManagementService.java | 8 ++++++
.../geode/management/internal/cli/CliUtil.java | 15 +++++++++++
.../internal/cli/commands/GfshCommand.java | 4 +++
.../management/internal/cli/CliUtilDUnitTest.java | 30 ++++++++++++++++++++++
.../apache/geode/test/dunit/rules/MemberVM.java | 8 ++++++
.../geode/test/junit/rules/MemberStarterRule.java | 10 ++++++--
8 files changed, 78 insertions(+), 7 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/management/ManagementService.java b/geode-core/src/main/java/org/apache/geode/management/ManagementService.java
index 887b04a..eb2e808 100755
--- a/geode-core/src/main/java/org/apache/geode/management/ManagementService.java
+++ b/geode-core/src/main/java/org/apache/geode/management/ManagementService.java
@@ -217,6 +217,11 @@ public abstract class ManagementService {
public abstract Set<ObjectName> queryMBeanNames(DistributedMember member);
/**
+ * Returns the ids of the async event queues on this member
+ */
+ public abstract Set<ObjectName> getAsyncEventQueueMBeanNames(DistributedMember member);
+
+ /**
* Returns an instance of an MBean. This is a reference to the MBean instance and not a
* {@link ObjectInstance}.
*
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java b/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java
index 467e3d1..f3c0fe3 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java
@@ -483,11 +483,6 @@ public class MBeanJMXAdapter implements ManagementConstants {
getMemberNameOrId(member))));
}
- public static ObjectName getAsyncEventQueueMBeanName(String member, String queueId) {
- return getObjectName((MessageFormat.format(OBJECTNAME__ASYNCEVENTQUEUE_MXBEAN, queueId,
- makeCompliantName(member))));
- }
-
public static ObjectName getDistributedRegionMbeanName(String regionPath) {
return getObjectName((MessageFormat.format(OBJECTNAME__DISTRIBUTEDREGION_MXBEAN,
makeCompliantRegionPath(regionPath))));
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java b/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
index 60615db..8f2c9c7 100755
--- a/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/SystemManagementService.java
@@ -18,6 +18,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.stream.Collectors;
import javax.management.Notification;
import javax.management.ObjectName;
@@ -358,6 +359,13 @@ public class SystemManagementService extends BaseManagementService {
}
@Override
+ public Set<ObjectName> getAsyncEventQueueMBeanNames(DistributedMember member) {
+ Set<ObjectName> mBeanNames = this.queryMBeanNames(member);
+ return mBeanNames.stream().filter(x -> "AsyncEventQueue".equals(x.getKeyProperty("service")))
+ .collect(Collectors.toSet());
+ }
+
+ @Override
public ObjectName registerMBean(Object object, ObjectName objectName) {
verifyManagementService();
return jmxAdapter.registerMBean(object, objectName, false);
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java
index 02eb8b8..8ad2282 100755
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/CliUtil.java
@@ -60,6 +60,7 @@ import org.apache.geode.management.DistributedRegionMXBean;
import org.apache.geode.management.ManagementService;
import org.apache.geode.management.cli.Result;
import org.apache.geode.management.internal.MBeanJMXAdapter;
+import org.apache.geode.management.internal.SystemManagementService;
import org.apache.geode.management.internal.cli.exceptions.UserErrorException;
import org.apache.geode.management.internal.cli.i18n.CliStrings;
import org.apache.geode.management.internal.cli.result.ResultBuilder;
@@ -406,6 +407,20 @@ public class CliUtil {
return result;
}
+ public static Set<DistributedMember> getMembersWithAsyncEventQueue(InternalCache cache,
+ String queueId) {
+ Set<DistributedMember> members = findMembers(null, null);
+ return members.stream().filter(m -> getAsyncEventQueueIds(cache, m).contains(queueId))
+ .collect(Collectors.toSet());
+ }
+
+ public static Set<String> getAsyncEventQueueIds(InternalCache cache, DistributedMember member) {
+ SystemManagementService managementService =
+ (SystemManagementService) ManagementService.getExistingManagementService(cache);
+ return managementService.getAsyncEventQueueMBeanNames(member).stream()
+ .map(x -> x.getKeyProperty("queue")).collect(Collectors.toSet());
+ }
+
static class CustomFileFilter implements FileFilter {
private String extensionWithDot;
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java
index 134a153..6a30378 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java
@@ -183,4 +183,8 @@ public interface GfshCommand extends CommandMarker {
default Set<DistributedMember> findAnyMembersForRegion(InternalCache cache, String regionPath) {
return CliUtil.getRegionAssociatedMembers(regionPath, cache, false);
}
+
+ default Set<DistributedMember> findMembersWithAsyncEventQueue(String queueId) {
+ return CliUtil.getMembersWithAsyncEventQueue(getCache(), queueId);
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/CliUtilDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/CliUtilDUnitTest.java
index f1050d8..d3b97b8 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/CliUtilDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/CliUtilDUnitTest.java
@@ -31,6 +31,7 @@ import org.junit.experimental.categories.Category;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
import org.apache.geode.management.internal.cli.exceptions.UserErrorException;
import org.apache.geode.test.dunit.rules.LocatorServerStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
@@ -184,6 +185,35 @@ public class CliUtilDUnitTest {
}
+ @Test
+ public void getMembersWithQueueId() throws Exception {
+ gfsh.executeAndAssertThat("create async-event-queue --id=queue1 --group=group1 --listener="
+ + MyAsyncEventListener.class.getName()).statusIsSuccess();
+ gfsh.executeAndAssertThat("create async-event-queue --id=queue2 --group=group2 --listener="
+ + MyAsyncEventListener.class.getName()).statusIsSuccess();
+ gfsh.executeAndAssertThat(
+ "create async-event-queue --id=queue --listener=" + MyAsyncEventListener.class.getName())
+ .statusIsSuccess();
+
+ locator.waitTillAsyncEventQueuesAreReadyOnServers("queue1", 2);
+ locator.waitTillAsyncEventQueuesAreReadyOnServers("queue2", 2);
+ locator.waitTillAsyncEventQueuesAreReadyOnServers("queue", 4);
+
+ locator.invoke(() -> {
+ members =
+ CliUtil.getMembersWithAsyncEventQueue(LocatorServerStartupRule.getCache(), "queue1");
+ assertThat(getNames(members)).containsExactlyInAnyOrder("member1", "member2");
+
+ members =
+ CliUtil.getMembersWithAsyncEventQueue(LocatorServerStartupRule.getCache(), "queue2");
+ assertThat(getNames(members)).containsExactlyInAnyOrder("member3", "member4");
+
+ members = CliUtil.getMembersWithAsyncEventQueue(LocatorServerStartupRule.getCache(), "queue");
+ assertThat(getNames(members)).containsExactlyInAnyOrder("member1", "member2", "member3",
+ "member4");
+ });
+ }
+
private static Set<String> getNames(Set<DistributedMember> members) {
return members.stream().map(DistributedMember::getName).collect(Collectors.toSet());
}
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/MemberVM.java b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/MemberVM.java
index 0e9cfbc..4e04b8e 100644
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/MemberVM.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/MemberVM.java
@@ -137,4 +137,12 @@ public class MemberVM implements Member {
vm.invoke(() -> LocatorServerStartupRule.memberStarter.waitTillDiskStoreIsReady(diskstoreName,
serverCount));
}
+
+ public void waitTillAsyncEventQueuesAreReadyOnServers(String queueId, int serverCount) {
+ vm.invoke(() -> {
+ LocatorServerStartupRule.memberStarter.waitTillAsyncEventQueuesAreReadyOnServers(queueId,
+ serverCount);
+ });
+ }
+
}
diff --git a/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java b/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
index da0f588..e2dcc9c 100644
--- a/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
+++ b/geode-core/src/test/java/org/apache/geode/test/junit/rules/MemberStarterRule.java
@@ -43,6 +43,7 @@ import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.management.DistributedRegionMXBean;
import org.apache.geode.management.DistributedSystemMXBean;
import org.apache.geode.management.ManagementService;
+import org.apache.geode.management.internal.cli.CliUtil;
import org.apache.geode.security.SecurityManager;
import org.apache.geode.test.junit.rules.serializable.SerializableExternalResource;
@@ -205,9 +206,9 @@ public abstract class MemberStarterRule<T> extends SerializableExternalResource
if (properties.containsKey(NAME)) {
name = properties.getProperty(NAME);
} else {
- if (this instanceof ServerStarterRule)
+ if (this instanceof ServerStarterRule) {
name = "server";
- else {
+ } else {
name = "locator";
}
}
@@ -263,6 +264,11 @@ public abstract class MemberStarterRule<T> extends SerializableExternalResource
.until(() -> getDiskStoreCount(diskstoreName) == serverCount);
}
+ public void waitTillAsyncEventQueuesAreReadyOnServers(String queueId, int serverCount) {
+ await().atMost(2, TimeUnit.SECONDS).until(
+ () -> CliUtil.getMembersWithAsyncEventQueue(getCache(), queueId).size() == serverCount);
+ }
+
abstract void stopMember();
@Override
--
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].