You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2019/10/03 20:52:53 UTC
[geode] 09/11: GEODE-7179: alter async queue command to change
state of event processor during creation (#4057)
This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch release/1.9.2
in repository https://gitbox.apache.org/repos/asf/geode.git
commit a4679a7670e41ebecb3ab64e5bc425bcd0bbc773
Author: Nabarun Nag <na...@users.noreply.github.com>
AuthorDate: Fri Sep 13 14:35:46 2019 -0700
GEODE-7179: alter async queue command to change state of event processor during creation (#4057)
* Alter the state of the event processor during the creation of the AEQ
* The state can be changed to paused from not paused and vice versa.
Co-authored-by: Donal Evans <do...@pivotal.io>
Co-authored-by: Nabarun Nag <na...@cs.wisc.edu>
Co-authored-by: Benjamin Ross <br...@pivotal.io>
---
.../AlterAsyncEventQueueCommandDUnitTest.java | 147 +++++++++++++++++++--
.../cli/commands/AlterAsyncEventQueueCommand.java | 12 +-
2 files changed, 147 insertions(+), 12 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java
index ba42d27..0a3c575 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java
@@ -15,6 +15,7 @@
package org.apache.geode.management.internal.cli.commands;
+import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.Before;
@@ -23,6 +24,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
+import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
@@ -34,6 +36,10 @@ import org.apache.geode.test.junit.rules.GfshCommandRule;
@Category({AEQTest.class})
public class AlterAsyncEventQueueCommandDUnitTest {
+ private static final int ALTERED_BATCH_SIZE = 200;
+ private static final int ALTERED_BATCH_TIME_INTERVAL = 300;
+ private static final int ALTERED_MAXIMUM_QUEUE_MEMORY = 400;
+
@Rule
public ClusterStartupRule lsRule = new ClusterStartupRule();
@@ -60,22 +66,140 @@ public class AlterAsyncEventQueueCommandDUnitTest {
server1.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
- assertThat(queue.getBatchSize()).isEqualTo(100);
- assertThat(queue.getBatchTimeInterval()).isEqualTo(5);
- assertThat(queue.getMaximumQueueMemory()).isEqualTo(100);
+ assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
+ assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
+ assertThat(queue.getMaximumQueueMemory())
+ .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
+ });
+
+ gfsh.executeAndAssertThat("alter async-event-queue --id=queue1 --batch-size="
+ + ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL
+ + " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY).statusIsSuccess();
+
+ // verify that server1's event queue still has the default value
+ // without restart
+ server1.invoke(() -> {
+ InternalCache cache = ClusterStartupRule.getCache();
+ AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
+ assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
+ assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
+ assertThat(queue.getMaximumQueueMemory())
+ .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
+ assertThat(cache.getAsyncEventQueue("queue2")).isNull();
+ });
+
+ // restart locator and server without clearing the file system
+ server1.stop(false);
+ locator.stop(false);
+
+ locator = lsRule.startLocatorVM(0);
+ server1 = lsRule.startServerVM(1, "group1", locator.getPort());
+ // verify that server1's queue is updated
+ server1.invoke(() -> {
+ InternalCache cache = ClusterStartupRule.getCache();
+ AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
+ assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE);
+ assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL);
+ assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY);
+ assertThat(cache.getAsyncEventQueue("queue2")).isNull();
+ });
+ }
+
+ @Test
+ public void whenAlterCommandUsedToChangeFromPauseToResumeThenAEQBehaviorMustChange()
+ throws Exception {
+ gfsh.executeAndAssertThat(
+ "create async-event-queue --pause-event-processing=true --id=queue1 --group=group1 --listener="
+ + MyAsyncEventListener.class.getName())
+ .statusIsSuccess();
+
+ locator.waitUntilAsyncEventQueuesAreReadyOnExactlyThisManyServers("queue1", 1);
+
+ // verify that server1's event queue has the default value
+ server1.invoke(() -> {
+ InternalCache cache = ClusterStartupRule.getCache();
+ AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
+ assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
+ assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
+ assertThat(queue.isDispatchingPaused()).isTrue();
+ assertThat(queue.getMaximumQueueMemory())
+ .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
+ });
+
+ gfsh.executeAndAssertThat(
+ "alter async-event-queue --id=queue1 --pause-event-processing=false --batch-size="
+ + ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL
+ + " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY)
+ .statusIsSuccess();
+
+ // verify that server1's event queue still has the default value
+ // without restart
+ server1.invoke(() -> {
+ InternalCache cache = ClusterStartupRule.getCache();
+ AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
+ assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
+ assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
+ assertThat(queue.getMaximumQueueMemory())
+ .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
+ assertThat(queue.isDispatchingPaused()).isTrue();
+ assertThat(cache.getAsyncEventQueue("queue2")).isNull();
+ });
+
+ // restart locator and server without clearing the file system
+ server1.stop(false);
+ locator.stop(false);
+
+ locator = lsRule.startLocatorVM(0);
+ server1 = lsRule.startServerVM(1, "group1", locator.getPort());
+ // verify that server1's queue is updated
+ server1.invoke(() -> {
+ InternalCache cache = ClusterStartupRule.getCache();
+ AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
+ assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE);
+ assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL);
+ assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY);
+ assertThat(queue.isDispatchingPaused()).isFalse();
+ assertThat(cache.getAsyncEventQueue("queue2")).isNull();
+ });
+ }
+
+ @Test
+ public void whenAlterCommandUsedToChangeFromResumeStateToPausedThenAEQBehaviorMustChange()
+ throws Exception {
+ gfsh.executeAndAssertThat(
+ "create async-event-queue --pause-event-processing=false --id=queue1 --group=group1 --listener="
+ + MyAsyncEventListener.class.getName())
+ .statusIsSuccess();
+
+ locator.waitUntilAsyncEventQueuesAreReadyOnExactlyThisManyServers("queue1", 1);
+
+ // verify that server1's event queue has the default value
+ server1.invoke(() -> {
+ InternalCache cache = ClusterStartupRule.getCache();
+ AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
+ assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
+ assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
+ assertThat(queue.isDispatchingPaused()).isFalse();
+ assertThat(queue.getMaximumQueueMemory())
+ .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
});
- gfsh.executeAndAssertThat("alter async-event-queue --id=queue1 " + "--batch-size=200 "
- + "--batch-time-interval=300 " + "--max-queue-memory=400").statusIsSuccess();
+ gfsh.executeAndAssertThat(
+ "alter async-event-queue --id=queue1 --pause-event-processing=true --batch-size="
+ + ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL
+ + " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY)
+ .statusIsSuccess();
// verify that server1's event queue still has the default value
// without restart
server1.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
- assertThat(queue.getBatchSize()).isEqualTo(100);
- assertThat(queue.getBatchTimeInterval()).isEqualTo(5);
- assertThat(queue.getMaximumQueueMemory()).isEqualTo(100);
+ assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
+ assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
+ assertThat(queue.getMaximumQueueMemory())
+ .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
+ assertThat(queue.isDispatchingPaused()).isFalse();
assertThat(cache.getAsyncEventQueue("queue2")).isNull();
});
@@ -89,9 +213,10 @@ public class AlterAsyncEventQueueCommandDUnitTest {
server1.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
- assertThat(queue.getBatchSize()).isEqualTo(200);
- assertThat(queue.getBatchTimeInterval()).isEqualTo(300);
- assertThat(queue.getMaximumQueueMemory()).isEqualTo(400);
+ assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE);
+ assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL);
+ assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY);
+ assertThat(queue.isDispatchingPaused()).isTrue();
assertThat(cache.getAsyncEventQueue("queue2")).isNull();
});
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java
index ebc53de..d3d60d0 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java
@@ -68,6 +68,9 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements
static final String BATCH_TIME_INTERVAL_HELP = CREATE_ASYNC_EVENT_QUEUE__BATCHTIMEINTERVAL__HELP;
static final String MAXIMUM_QUEUE_MEMORY_HELP =
CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY__HELP;
+ static final String PAUSE_EVENT_PROCESSING = "pause-event-processing";
+ static final String PAUSE_EVENT_PROCESSING_HELP =
+ "Pause event processing when the async event queue is created";
@CliCommand(value = COMMAND_NAME, help = COMMAND_HELP)
@CliMetaData(
@@ -80,7 +83,10 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements
help = BATCH_TIME_INTERVAL_HELP) Integer batchTimeInterval,
@CliOption(key = MAX_QUEUE_MEMORY, help = MAXIMUM_QUEUE_MEMORY_HELP) Integer maxQueueMemory,
@CliOption(key = IFEXISTS, help = IFEXISTS_HELP, specifiedDefaultValue = "true",
- unspecifiedDefaultValue = "false") boolean ifExists)
+ unspecifiedDefaultValue = "false") boolean ifExists,
+ @CliOption(key = PAUSE_EVENT_PROCESSING, help = PAUSE_EVENT_PROCESSING_HELP,
+ specifiedDefaultValue = "true",
+ unspecifiedDefaultValue = "false") boolean pauseEventProcessing)
throws IOException, SAXException, ParserConfigurationException, TransformerException,
EntityNotFoundException {
@@ -98,6 +104,7 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements
CacheConfig.AsyncEventQueue aeqConfiguration = new CacheConfig.AsyncEventQueue();
aeqConfiguration.setId(id);
+ aeqConfiguration.setPauseEventProcessing(pauseEventProcessing);
if (batchSize != null) {
aeqConfiguration.setBatchSize(batchSize + "");
@@ -163,6 +170,9 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements
if (StringUtils.isNotBlank(aeqConfiguration.getMaximumQueueMemory())) {
queue.setMaximumQueueMemory(aeqConfiguration.getMaximumQueueMemory());
}
+ if (aeqConfiguration.isPauseEventProcessing() != null) {
+ queue.setPauseEventProcessing(aeqConfiguration.isPauseEventProcessing());
+ }
aeqConfigsHaveBeenUpdated = true;
}