You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2019/10/03 20:52:53 UTC

[geode] 09/11: GEODE-7179: alter async queue command to change state of event processor during creation (#4057)

This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch release/1.9.2
in repository https://gitbox.apache.org/repos/asf/geode.git

commit a4679a7670e41ebecb3ab64e5bc425bcd0bbc773
Author: Nabarun Nag <na...@users.noreply.github.com>
AuthorDate: Fri Sep 13 14:35:46 2019 -0700

    GEODE-7179: alter async queue command to change state of event processor during creation (#4057)
    
    * Alter the state of the event processor during the creation of the AEQ
    	* The state can be changed to paused from not paused and vice versa.
    
    Co-authored-by: Donal Evans <do...@pivotal.io>
    Co-authored-by: Nabarun Nag <na...@cs.wisc.edu>
    Co-authored-by: Benjamin Ross <br...@pivotal.io>
---
 .../AlterAsyncEventQueueCommandDUnitTest.java      | 147 +++++++++++++++++++--
 .../cli/commands/AlterAsyncEventQueueCommand.java  |  12 +-
 2 files changed, 147 insertions(+), 12 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java
index ba42d27..0a3c575 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java
@@ -15,6 +15,7 @@
 
 package org.apache.geode.management.internal.cli.commands;
 
+import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import org.junit.Before;
@@ -23,6 +24,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
 import org.apache.geode.test.dunit.rules.ClusterStartupRule;
@@ -34,6 +36,10 @@ import org.apache.geode.test.junit.rules.GfshCommandRule;
 @Category({AEQTest.class})
 public class AlterAsyncEventQueueCommandDUnitTest {
 
+  private static final int ALTERED_BATCH_SIZE = 200;
+  private static final int ALTERED_BATCH_TIME_INTERVAL = 300;
+  private static final int ALTERED_MAXIMUM_QUEUE_MEMORY = 400;
+
   @Rule
   public ClusterStartupRule lsRule = new ClusterStartupRule();
 
@@ -60,22 +66,140 @@ public class AlterAsyncEventQueueCommandDUnitTest {
     server1.invoke(() -> {
       InternalCache cache = ClusterStartupRule.getCache();
       AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
-      assertThat(queue.getBatchSize()).isEqualTo(100);
-      assertThat(queue.getBatchTimeInterval()).isEqualTo(5);
-      assertThat(queue.getMaximumQueueMemory()).isEqualTo(100);
+      assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
+      assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
+      assertThat(queue.getMaximumQueueMemory())
+          .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
+    });
+
+    gfsh.executeAndAssertThat("alter async-event-queue --id=queue1 --batch-size="
+        + ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL
+        + " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY).statusIsSuccess();
+
+    // verify that server1's event queue still has the default value
+    // without restart
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
+      assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
+      assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
+      assertThat(queue.getMaximumQueueMemory())
+          .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
+      assertThat(cache.getAsyncEventQueue("queue2")).isNull();
+    });
+
+    // restart locator and server without clearing the file system
+    server1.stop(false);
+    locator.stop(false);
+
+    locator = lsRule.startLocatorVM(0);
+    server1 = lsRule.startServerVM(1, "group1", locator.getPort());
+    // verify that server1's queue is updated
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
+      assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE);
+      assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL);
+      assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY);
+      assertThat(cache.getAsyncEventQueue("queue2")).isNull();
+    });
+  }
+
+  @Test
+  public void whenAlterCommandUsedToChangeFromPauseToResumeThenAEQBehaviorMustChange()
+      throws Exception {
+    gfsh.executeAndAssertThat(
+        "create async-event-queue --pause-event-processing=true --id=queue1 --group=group1 --listener="
+            + MyAsyncEventListener.class.getName())
+        .statusIsSuccess();
+
+    locator.waitUntilAsyncEventQueuesAreReadyOnExactlyThisManyServers("queue1", 1);
+
+    // verify that server1's event queue has the default value
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
+      assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
+      assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
+      assertThat(queue.isDispatchingPaused()).isTrue();
+      assertThat(queue.getMaximumQueueMemory())
+          .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
+    });
+
+    gfsh.executeAndAssertThat(
+        "alter async-event-queue --id=queue1 --pause-event-processing=false --batch-size="
+            + ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL
+            + " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY)
+        .statusIsSuccess();
+
+    // verify that server1's event queue still has the default value
+    // without restart
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
+      assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
+      assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
+      assertThat(queue.getMaximumQueueMemory())
+          .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
+      assertThat(queue.isDispatchingPaused()).isTrue();
+      assertThat(cache.getAsyncEventQueue("queue2")).isNull();
+    });
+
+    // restart locator and server without clearing the file system
+    server1.stop(false);
+    locator.stop(false);
+
+    locator = lsRule.startLocatorVM(0);
+    server1 = lsRule.startServerVM(1, "group1", locator.getPort());
+    // verify that server1's queue is updated
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
+      assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE);
+      assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL);
+      assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY);
+      assertThat(queue.isDispatchingPaused()).isFalse();
+      assertThat(cache.getAsyncEventQueue("queue2")).isNull();
+    });
+  }
+
+  @Test
+  public void whenAlterCommandUsedToChangeFromResumeStateToPausedThenAEQBehaviorMustChange()
+      throws Exception {
+    gfsh.executeAndAssertThat(
+        "create async-event-queue --pause-event-processing=false --id=queue1 --group=group1 --listener="
+            + MyAsyncEventListener.class.getName())
+        .statusIsSuccess();
+
+    locator.waitUntilAsyncEventQueuesAreReadyOnExactlyThisManyServers("queue1", 1);
+
+    // verify that server1's event queue has the default value
+    server1.invoke(() -> {
+      InternalCache cache = ClusterStartupRule.getCache();
+      AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
+      assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
+      assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
+      assertThat(queue.isDispatchingPaused()).isFalse();
+      assertThat(queue.getMaximumQueueMemory())
+          .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
     });
 
-    gfsh.executeAndAssertThat("alter async-event-queue --id=queue1 " + "--batch-size=200 "
-        + "--batch-time-interval=300 " + "--max-queue-memory=400").statusIsSuccess();
+    gfsh.executeAndAssertThat(
+        "alter async-event-queue --id=queue1 --pause-event-processing=true --batch-size="
+            + ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL
+            + " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY)
+        .statusIsSuccess();
 
     // verify that server1's event queue still has the default value
     // without restart
     server1.invoke(() -> {
       InternalCache cache = ClusterStartupRule.getCache();
       AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
-      assertThat(queue.getBatchSize()).isEqualTo(100);
-      assertThat(queue.getBatchTimeInterval()).isEqualTo(5);
-      assertThat(queue.getMaximumQueueMemory()).isEqualTo(100);
+      assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
+      assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
+      assertThat(queue.getMaximumQueueMemory())
+          .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
+      assertThat(queue.isDispatchingPaused()).isFalse();
       assertThat(cache.getAsyncEventQueue("queue2")).isNull();
     });
 
@@ -89,9 +213,10 @@ public class AlterAsyncEventQueueCommandDUnitTest {
     server1.invoke(() -> {
       InternalCache cache = ClusterStartupRule.getCache();
       AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
-      assertThat(queue.getBatchSize()).isEqualTo(200);
-      assertThat(queue.getBatchTimeInterval()).isEqualTo(300);
-      assertThat(queue.getMaximumQueueMemory()).isEqualTo(400);
+      assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE);
+      assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL);
+      assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY);
+      assertThat(queue.isDispatchingPaused()).isTrue();
       assertThat(cache.getAsyncEventQueue("queue2")).isNull();
     });
   }
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java
index ebc53de..d3d60d0 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java
@@ -68,6 +68,9 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements
   static final String BATCH_TIME_INTERVAL_HELP = CREATE_ASYNC_EVENT_QUEUE__BATCHTIMEINTERVAL__HELP;
   static final String MAXIMUM_QUEUE_MEMORY_HELP =
       CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY__HELP;
+  static final String PAUSE_EVENT_PROCESSING = "pause-event-processing";
+  static final String PAUSE_EVENT_PROCESSING_HELP =
+      "Pause event processing when the async event queue is created";
 
   @CliCommand(value = COMMAND_NAME, help = COMMAND_HELP)
   @CliMetaData(
@@ -80,7 +83,10 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements
           help = BATCH_TIME_INTERVAL_HELP) Integer batchTimeInterval,
       @CliOption(key = MAX_QUEUE_MEMORY, help = MAXIMUM_QUEUE_MEMORY_HELP) Integer maxQueueMemory,
       @CliOption(key = IFEXISTS, help = IFEXISTS_HELP, specifiedDefaultValue = "true",
-          unspecifiedDefaultValue = "false") boolean ifExists)
+          unspecifiedDefaultValue = "false") boolean ifExists,
+      @CliOption(key = PAUSE_EVENT_PROCESSING, help = PAUSE_EVENT_PROCESSING_HELP,
+          specifiedDefaultValue = "true",
+          unspecifiedDefaultValue = "false") boolean pauseEventProcessing)
       throws IOException, SAXException, ParserConfigurationException, TransformerException,
       EntityNotFoundException {
 
@@ -98,6 +104,7 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements
 
     CacheConfig.AsyncEventQueue aeqConfiguration = new CacheConfig.AsyncEventQueue();
     aeqConfiguration.setId(id);
+    aeqConfiguration.setPauseEventProcessing(pauseEventProcessing);
 
     if (batchSize != null) {
       aeqConfiguration.setBatchSize(batchSize + "");
@@ -163,6 +170,9 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements
         if (StringUtils.isNotBlank(aeqConfiguration.getMaximumQueueMemory())) {
           queue.setMaximumQueueMemory(aeqConfiguration.getMaximumQueueMemory());
         }
+        if (aeqConfiguration.isPauseEventProcessing() != null) {
+          queue.setPauseEventProcessing(aeqConfiguration.isPauseEventProcessing());
+        }
         aeqConfigsHaveBeenUpdated = true;
       }