You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2019/10/03 20:52:51 UTC

[geode] 07/11: GEODE-7149: Changes to support AsyncEventQueue's dispatcher status with AsyncEventQueue beans (#4029)

This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch release/1.9.2
in repository https://gitbox.apache.org/repos/asf/geode.git

commit f49a4ac75fd163de68410c022e3598f93403655f
Author: agingade <ag...@pivotal.io>
AuthorDate: Wed Sep 11 11:09:01 2019 -0700

    GEODE-7149: Changes to support AsyncEventQueue's dispatcher status with AsyncEventQueue beans (#4029)
    
    Mbean changes to support AsyncEventQueue's pauseDispatcher() and resumeDispatcher() apis.
---
 .../geode/management/AsyncEventQueueMXBean.java    |   8 +
 .../internal/beans/AsyncEventQueueMBean.java       |   5 +
 .../internal/beans/AsyncEventQueueMBeanBridge.java |   4 +
 .../internal/beans/AsyncEventQueueMBeanTest.java   |  52 +++
 .../geode/management/AEQManagementDUnitTest.java   | 415 +++++++++++++++++++++
 5 files changed, 484 insertions(+)

diff --git a/geode-core/src/main/java/org/apache/geode/management/AsyncEventQueueMXBean.java b/geode-core/src/main/java/org/apache/geode/management/AsyncEventQueueMXBean.java
index e6d68f7..6feb5c4 100644
--- a/geode-core/src/main/java/org/apache/geode/management/AsyncEventQueueMXBean.java
+++ b/geode-core/src/main/java/org/apache/geode/management/AsyncEventQueueMXBean.java
@@ -127,4 +127,12 @@ public interface AsyncEventQueueMXBean {
    * Returns the number of bytes overflowed to disk for this Sender.
    */
   long getBytesOverflowedToDisk();
+
+  /**
+   * Returns the state of the event dispatcher.
+   *
+   * @return True if the dispatcher is paused, false otherwise.
+   */
+  boolean isDispatchingPaused();
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java
index bf7afd0..bd95a3a 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBean.java
@@ -124,4 +124,9 @@ public class AsyncEventQueueMBean extends NotificationBroadcasterSupport
   public void stopMonitor() {
     bridge.stopMonitor();
   }
+
+  @Override
+  public boolean isDispatchingPaused() {
+    return bridge.isDispatchingPaused();
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java
index b1a1caa..e09ab08 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanBridge.java
@@ -147,4 +147,8 @@ public class AsyncEventQueueMBeanBridge {
   public void stopMonitor() {
     monitor.stopListener();
   }
+
+  public boolean isDispatchingPaused() {
+    return queueImpl.isDispatchingPaused();
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanTest.java
new file mode 100644
index 0000000..6901db4
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/beans/AsyncEventQueueMBeanTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.beans;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.JMXTest;
+
+@Category(JMXTest.class)
+public class AsyncEventQueueMBeanTest {
+
+  private AsyncEventQueueMBean asyncEventQueueMBean;
+
+  private AsyncEventQueueMBeanBridge asyncEventQueueMBeanBridge;
+
+  @Test
+  public void asyncEvenQueueCreatedWithDispatcherNotPaused() {
+    asyncEventQueueMBeanBridge = mock(AsyncEventQueueMBeanBridge.class);
+    asyncEventQueueMBean = new AsyncEventQueueMBean(asyncEventQueueMBeanBridge);
+
+    when(asyncEventQueueMBeanBridge.isDispatchingPaused()).thenReturn(false);
+
+    assertThat(asyncEventQueueMBean.isDispatchingPaused()).isFalse();
+  }
+
+  @Test
+  public void asyncEvenQueueCreatedWithDispatcherPaused() {
+    asyncEventQueueMBeanBridge = mock(AsyncEventQueueMBeanBridge.class);
+    asyncEventQueueMBean = new AsyncEventQueueMBean(asyncEventQueueMBeanBridge);
+
+    when(asyncEventQueueMBeanBridge.isDispatchingPaused()).thenReturn(true);
+
+    assertThat(asyncEventQueueMBean.isDispatchingPaused()).isTrue();
+  }
+}
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/management/AEQManagementDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/management/AEQManagementDUnitTest.java
new file mode 100644
index 0000000..5ce3038
--- /dev/null
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/management/AEQManagementDUnitTest.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.management;
+
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.Properties;
+
+import javax.management.ObjectName;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.cache.asyncqueue.AsyncEventQueueFactory;
+import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
+import org.apache.geode.management.internal.SystemManagementService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.SerializableCallableIF;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.AEQTest;
+
+@Category({AEQTest.class})
+public class AEQManagementDUnitTest implements Serializable {
+
+  private MemberVM locator, server1, server2;
+
+  private String aeqID = "aeqMBeanTest";
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
+
+  public AEQManagementDUnitTest() {
+    super();
+  }
+
+  @Before
+  public void setup() {
+    locator = clusterStartupRule.startLocatorVM(0, locatorProperties());
+    locator.invoke(() -> startManagerService());
+    server1 = clusterStartupRule.startServerVM(1, locator.getPort());
+    server2 = clusterStartupRule.startServerVM(2, locator.getPort());
+  }
+
+  private void startManagerService() {
+    SystemManagementService service = (SystemManagementService) ManagementService
+        .getManagementService(ClusterStartupRule.getCache());
+    service.createManager();
+  }
+
+  @Test
+  public void testCreateAsyncEventQueueAndVerifyBeansRegistered() {
+    server1.invoke(() -> createAsyncEventQueueAndVerifyBeanRegisteration("server1", false));
+
+    server2.invoke(() -> createAsyncEventQueueAndVerifyBeanRegisteration("server2", false));
+
+    final SerializableCallableIF<InternalDistributedMember> memberSerializableCallableIF =
+        () -> ClusterStartupRule.getCache().getMyId();
+
+    DistributedMember memberServer1 = server1.invoke(memberSerializableCallableIF);
+
+    locator.invoke(() -> {
+      waitForProxyBeansArrival(memberServer1);
+
+      AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer1);
+
+      assertThat(aeqBean).isNotNull();
+    });
+
+    DistributedMember memberServer2 = server2.invoke(
+        memberSerializableCallableIF);
+
+    locator.invoke(() -> {
+      waitForProxyBeansArrival(memberServer2);
+
+      AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer2);
+
+      assertThat(aeqBean).isNotNull();
+    });
+
+  }
+
+  @Test
+  public void testDestroyAsyncEventQueueAndVerifyBeansAreUpdated() {
+    server1.invoke(() -> createAsyncEventQueueAndVerifyBeanRegisteration("server1", false));
+
+    server2.invoke(() -> createAsyncEventQueueAndVerifyBeanRegisteration("server2", false));
+
+    final SerializableCallableIF<InternalDistributedMember> memberSerializableCallableIF =
+        () -> ClusterStartupRule.getCache().getMyId();
+
+    DistributedMember memberServer1 = server1.invoke(memberSerializableCallableIF);
+    locator.invoke(() -> {
+      waitForProxyBeansArrival(memberServer1);
+
+      AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer1);
+
+      assertThat(aeqBean).isNotNull();
+    });
+
+
+    DistributedMember memberServer2 = server2.invoke(memberSerializableCallableIF);
+    locator.invoke(() -> {
+      waitForProxyBeansArrival(memberServer2);
+
+      AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer2);
+
+      assertThat(aeqBean).isNotNull();
+    });
+
+    server1.invoke(() -> {
+      destroyAsyncEventQueue();
+      ManagementService mService = waitForAeqBeanToUnRegister();
+
+      assertThat(mService.getLocalAsyncEventQueueMXBean(aeqID)).isNull();
+    });
+
+    server2.invoke(() -> {
+      destroyAsyncEventQueue();
+      ManagementService mService = waitForAeqBeanToUnRegister();
+
+      assertThat(mService.getLocalAsyncEventQueueMXBean(aeqID)).isNull();
+    });
+
+
+    locator.invoke(() -> {
+      waitForProxyBeansRemoval(memberServer1);
+
+      AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer1);
+
+      assertThat(aeqBean).isNull();
+    });
+
+    locator.invoke(() -> {
+      waitForProxyBeansRemoval(memberServer2);
+
+      AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer2);
+
+      assertThat(aeqBean).isNull();
+    });
+
+  }
+
+  @Test
+  public void testCreateAEQWithDispatcherInPausedStateAndVerifyUsingMBean() {
+    server1.invoke(() -> {
+      createAsyncEventQueueAndVerifyBeanRegisteration("server1", true);
+
+      ManagementService mService =
+          ManagementService.getManagementService(ClusterStartupRule.getCache());
+      AsyncEventQueueMXBean aeqBean = mService.getLocalAsyncEventQueueMXBean(aeqID);
+
+      assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true);
+    });
+
+    server2.invoke(() -> {
+      createAsyncEventQueueAndVerifyBeanRegisteration("server2", true);
+
+      ManagementService mService =
+          ManagementService.getManagementService(ClusterStartupRule.getCache());
+      AsyncEventQueueMXBean aeqBean = mService.getLocalAsyncEventQueueMXBean(aeqID);
+
+      assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true);
+    });
+
+    final SerializableCallableIF<InternalDistributedMember> memberSerializableCallableIF =
+        () -> ClusterStartupRule.getCache().getMyId();
+
+    DistributedMember memberServer1 = server1.invoke(memberSerializableCallableIF);
+
+    locator.invoke(() -> {
+      waitForProxyBeansArrival(memberServer1);
+
+      AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer1);
+
+      assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true);
+    });
+
+    DistributedMember memberServer2 = server2.invoke(
+        memberSerializableCallableIF);
+
+    locator.invoke(() -> {
+      waitForProxyBeansArrival(memberServer2);
+
+      AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer2);
+
+      assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true);
+    });
+  }
+
+  @Test
+  public void testCreateAEQWithDispatcherInPausedStateAndResumeAndVerifyUsingMBean() {
+    server1.invoke(() -> {
+      createAsyncEventQueueAndVerifyBeanRegisteration("server1", true);
+
+      ManagementService mService =
+          ManagementService.getManagementService(ClusterStartupRule.getCache());
+      AsyncEventQueueMXBean aeqBean = mService.getLocalAsyncEventQueueMXBean(aeqID);
+
+      assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true);
+    });
+
+    server2.invoke(() -> {
+      createAsyncEventQueueAndVerifyBeanRegisteration("server2", true);
+
+      ManagementService mService =
+          ManagementService.getManagementService(ClusterStartupRule.getCache());
+      AsyncEventQueueMXBean aeqBean = mService.getLocalAsyncEventQueueMXBean(aeqID);
+
+      assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true);
+    });
+
+    final SerializableCallableIF<InternalDistributedMember> memberSerializableCallableIF =
+        () -> ClusterStartupRule.getCache().getMyId();
+
+    DistributedMember memberServer1 = server1.invoke(memberSerializableCallableIF);
+
+    locator.invoke(() -> {
+      waitForProxyBeansArrival(memberServer1);
+
+      AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer1);
+
+      assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true);
+    });
+
+    DistributedMember memberServer2 = server2.invoke(
+        memberSerializableCallableIF);
+
+    locator.invoke(() -> {
+      waitForProxyBeansArrival(memberServer2);
+
+      AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer2);
+
+      assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true);
+    });
+
+    server1.invoke(() -> {
+      AsyncEventQueue asyncEventQueue = ClusterStartupRule.getCache().getAsyncEventQueue(aeqID);
+      asyncEventQueue.resumeEventDispatching();
+
+      ManagementService mService =
+          ManagementService.getManagementService(ClusterStartupRule.getCache());
+
+      GeodeAwaitility.await().untilAsserted(() -> {
+        AsyncEventQueueMXBean aeqBean = mService.getLocalAsyncEventQueueMXBean(aeqID);
+        assertThat(mService.getLocalAsyncEventQueueMXBean(aeqID)).isNotNull();
+        assertThat(aeqBean.isDispatchingPaused()).isEqualTo(false);
+      });
+    });
+
+    locator.invoke(() -> GeodeAwaitility.await().untilAsserted(() -> {
+      AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer1);
+      assertThat(aeqBean.isDispatchingPaused()).isEqualTo(true);
+    }));
+
+    server2.invoke(() -> {
+      AsyncEventQueue asyncEventQueue = ClusterStartupRule.getCache().getAsyncEventQueue(aeqID);
+      asyncEventQueue.resumeEventDispatching();
+
+      ManagementService mService =
+          ManagementService.getManagementService(ClusterStartupRule.getCache());
+
+      GeodeAwaitility.await().untilAsserted(() -> {
+        AsyncEventQueueMXBean aeqBean = mService.getLocalAsyncEventQueueMXBean(aeqID);
+        assertThat(mService.getLocalAsyncEventQueueMXBean(aeqID)).isNotNull();
+        assertThat(aeqBean.isDispatchingPaused()).isEqualTo(false);
+      });
+    });
+
+    locator.invoke(() -> GeodeAwaitility.await().untilAsserted(() -> {
+      AsyncEventQueueMXBean aeqBean = getAsyncEventQueueMXBean(memberServer2);
+      assertThat(aeqBean.isDispatchingPaused()).isEqualTo(false);
+    }));
+
+  }
+
+  private AsyncEventQueueMXBean getAsyncEventQueueMXBean(DistributedMember memberServer1) {
+    SystemManagementService service =
+        (SystemManagementService) SystemManagementService
+            .getManagementService(ClusterStartupRule.getCache());
+    ObjectName queueMBeanName = service.getAsyncEventQueueMBeanName(memberServer1, aeqID);
+    return service.getMBeanProxy(queueMBeanName, AsyncEventQueueMXBean.class);
+  }
+
+  private void createAsyncEventQueueAndVerifyBeanRegisteration(String diskStoreName,
+      boolean dispatcherPaused) {
+    createAsyncEventQueue(aeqID, false, 100, 100, false, false,
+        diskStoreName, false, dispatcherPaused);
+    waitForAeqBeanToRegister();
+  }
+
+  private void destroyAsyncEventQueue() {
+    AsyncEventQueueImpl aeq =
+        (AsyncEventQueueImpl) ClusterStartupRule.getCache().getAsyncEventQueue(aeqID);
+    assertThat(aeq).isNotNull();
+    aeq.destroy();
+  }
+
+  private void waitForProxyBeansArrival(DistributedMember member) {
+    GeodeAwaitility.await().untilAsserted(() -> {
+      SystemManagementService service =
+          (SystemManagementService) SystemManagementService.getManagementService(
+              ClusterStartupRule.getCache());
+      ObjectName queueMBeanName = service.getAsyncEventQueueMBeanName(member, aeqID);
+      assertThat(queueMBeanName).isNotNull();
+      assertThat(service.getMBeanProxy(queueMBeanName, AsyncEventQueueMXBean.class)).isNotNull();
+    });
+  }
+
+  private void waitForProxyBeansRemoval(DistributedMember member) {
+    GeodeAwaitility.await().untilAsserted(() -> {
+      SystemManagementService service =
+          (SystemManagementService) SystemManagementService.getManagementService(
+              ClusterStartupRule.getCache());
+      ObjectName queueMBeanName = service.getAsyncEventQueueMBeanName(member, aeqID);
+      assertThat(service.getMBeanProxy(queueMBeanName, AsyncEventQueueMXBean.class)).isNull();
+    });
+  }
+
+  private static int getLocatorPort() {
+    return Locator.getLocators().get(0).getPort();
+  }
+
+  private ManagementService waitForAeqBeanToRegister() {
+    ManagementService mService =
+        ManagementService.getManagementService(ClusterStartupRule.getCache());
+
+    GeodeAwaitility.await().untilAsserted(() -> {
+      assertThat(mService.getLocalAsyncEventQueueMXBean(aeqID)).isNotNull();
+    });
+
+    return mService;
+  }
+
+  private ManagementService waitForAeqBeanToUnRegister() {
+    ManagementService mService =
+        ManagementService.getManagementService(ClusterStartupRule.getCache());
+
+    GeodeAwaitility.await().untilAsserted(() -> {
+      assertThat(mService.getLocalAsyncEventQueueMXBean(aeqID)).isNull();
+    });
+
+    return mService;
+  }
+
+  private Properties locatorProperties() {
+    int jmxPort = AvailablePortHelper.getRandomAvailableTCPPort();
+    Properties props = new Properties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(LOG_LEVEL, "fine");
+    props.setProperty(ConfigurationProperties.JMX_MANAGER_HOSTNAME_FOR_CLIENTS, "localhost");
+    props.setProperty(ConfigurationProperties.JMX_MANAGER_PORT, "" + jmxPort);
+
+    return props;
+  }
+
+  public void createAsyncEventQueue(String asyncChannelId, boolean isParallel,
+      Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
+      String diskStoreName, boolean isDiskSynchronous, boolean dispatcherPaused) {
+
+    if (diskStoreName != null) {
+      File directory = new File(
+          asyncChannelId + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+      directory.mkdir();
+      File[] dirs1 = new File[] {directory};
+      DiskStoreFactory dsf = ClusterStartupRule.getCache().createDiskStoreFactory();
+      dsf.setDiskDirs(dirs1);
+      dsf.create(diskStoreName);
+    }
+
+    AsyncEventListener asyncEventListener = new MyAsyncEventListener();
+
+    AsyncEventQueueFactory factory = ClusterStartupRule.getCache().createAsyncEventQueueFactory();
+    factory.setBatchSize(batchSize);
+    factory.setPersistent(isPersistent);
+    factory.setDiskStoreName(diskStoreName);
+    factory.setDiskSynchronous(isDiskSynchronous);
+    factory.setBatchConflationEnabled(isConflation);
+    factory.setMaximumQueueMemory(maxMemory);
+    factory.setParallel(isParallel);
+    factory.setDispatcherThreads(3);
+    if (dispatcherPaused) {
+      factory.pauseEventDispatching();
+    }
+    factory.create(asyncChannelId, asyncEventListener);
+  }
+
+}