You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2017/11/29 21:53:17 UTC
[geode] branch develop updated: GEODE-3539: add test coverage for "create async-event-queue" and "lis… (#1093)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new de22c2d GEODE-3539: add test coverage for "create async-event-queue" and "lis… (#1093)
de22c2d is described below
commit de22c2d1b87878fc457e0e24f62011bb5e13a20b
Author: jinmeiliao <ji...@pivotal.io>
AuthorDate: Wed Nov 29 13:53:15 2017 -0800
GEODE-3539: add test coverage for "create async-event-queue" and "lis… (#1093)
---
.../cli/commands/CreateAsyncEventQueueCommand.java | 124 +++---
.../internal/cli/commands/GfshCommand.java | 4 +-
.../internal/cli/functions/CliFunctionResult.java | 2 +-
.../functions/CreateAsyncEventQueueFunction.java | 53 +--
.../CreateAsyncEventQueueCommandDUnitTest.java | 128 ++++++
.../commands/CreateAsyncEventQueueCommandTest.java | 213 ++++++++++
.../commands/DestroyGatewaySenderCommandTest.java | 3 +-
.../ListAsyncEventQueuesCommandDUnitTest.java | 81 ++++
.../cli/commands/QueueCommandsDUnitTest.java | 437 ---------------------
.../cli/functions/CliFunctionResultTest.java | 2 +-
.../test/junit/assertions/CommandResultAssert.java | 14 +-
.../geode/test/junit/rules/GfshParserRule.java | 3 +
.../cli/commands/CommandOverHttpDUnitTest.java | 4 +-
13 files changed, 508 insertions(+), 560 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateAsyncEventQueueCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateAsyncEventQueueCommand.java
index eb89c21..0b46c0b 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateAsyncEventQueueCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateAsyncEventQueueCommand.java
@@ -23,16 +23,17 @@ import java.util.concurrent.atomic.AtomicReference;
import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
-import org.apache.geode.SystemFailure;
+import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
+import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.management.cli.ConverterHint;
import org.apache.geode.management.cli.Result;
-import org.apache.geode.management.internal.cli.CliUtil;
import org.apache.geode.management.internal.cli.functions.AsyncEventQueueFunctionArgs;
import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
import org.apache.geode.management.internal.cli.functions.CreateAsyncEventQueueFunction;
import org.apache.geode.management.internal.cli.i18n.CliStrings;
+import org.apache.geode.management.internal.cli.result.CommandResult;
import org.apache.geode.management.internal.cli.result.ResultBuilder;
import org.apache.geode.management.internal.cli.result.TabularResultData;
import org.apache.geode.management.internal.configuration.domain.XmlEntity;
@@ -52,15 +53,15 @@ public class CreateAsyncEventQueueCommand implements GfshCommand {
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__GROUP__HELP) String[] groups,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__PARALLEL,
unspecifiedDefaultValue = "false", specifiedDefaultValue = "true",
- help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__PARALLEL__HELP) Boolean parallel,
+ help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__PARALLEL__HELP) boolean parallel,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__ENABLEBATCHCONFLATION,
unspecifiedDefaultValue = "false", specifiedDefaultValue = "true",
- help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__ENABLEBATCHCONFLATION__HELP) Boolean enableBatchConflation,
+ help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__ENABLEBATCHCONFLATION__HELP) boolean enableBatchConflation,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__BATCH_SIZE,
unspecifiedDefaultValue = "100",
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__BATCH_SIZE__HELP) int batchSize,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__BATCHTIMEINTERVAL,
- unspecifiedDefaultValue = "1000",
+ unspecifiedDefaultValue = AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL + "",
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__BATCHTIMEINTERVAL__HELP) int batchTimeInterval,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__PERSISTENT,
unspecifiedDefaultValue = "false", specifiedDefaultValue = "true",
@@ -69,16 +70,16 @@ public class CreateAsyncEventQueueCommand implements GfshCommand {
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISK_STORE__HELP) String diskStore,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISKSYNCHRONOUS,
unspecifiedDefaultValue = "true", specifiedDefaultValue = "true",
- help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISKSYNCHRONOUS__HELP) Boolean diskSynchronous,
+ help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISKSYNCHRONOUS__HELP) boolean diskSynchronous,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__FORWARD_EXPIRATION_DESTROY,
- unspecifiedDefaultValue = "false", specifiedDefaultValue = "false",
- help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__FORWARD_EXPIRATION_DESTROY__HELP) Boolean ignoreEvictionAndExpiration,
+ unspecifiedDefaultValue = "false", specifiedDefaultValue = "true",
+ help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__FORWARD_EXPIRATION_DESTROY__HELP) boolean forwardExpirationDestroy,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY,
unspecifiedDefaultValue = "100",
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY__HELP) int maxQueueMemory,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISPATCHERTHREADS,
unspecifiedDefaultValue = "1",
- help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISPATCHERTHREADS__HELP) Integer dispatcherThreads,
+ help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISPATCHERTHREADS__HELP) int dispatcherThreads,
@CliOption(key = CliStrings.CREATE_ASYNC_EVENT_QUEUE__ORDERPOLICY,
unspecifiedDefaultValue = "KEY",
help = CliStrings.CREATE_ASYNC_EVENT_QUEUE__ORDERPOLICY__HELP) String orderPolicy,
@@ -97,75 +98,60 @@ public class CreateAsyncEventQueueCommand implements GfshCommand {
}
Properties listenerProperties = new Properties();
- try {
- if (listenerParamsAndValues != null) {
- for (String listenerParamsAndValue : listenerParamsAndValues) {
- final int hashPosition = listenerParamsAndValue.indexOf('#');
- if (hashPosition == -1) {
- listenerProperties.put(listenerParamsAndValue, "");
- } else {
- listenerProperties.put(listenerParamsAndValue.substring(0, hashPosition),
- listenerParamsAndValue.substring(hashPosition + 1));
- }
+ if (listenerParamsAndValues != null) {
+ for (String listenerParamsAndValue : listenerParamsAndValues) {
+ final int hashPosition = listenerParamsAndValue.indexOf('#');
+ if (hashPosition == -1) {
+ listenerProperties.put(listenerParamsAndValue, "");
+ } else {
+ listenerProperties.put(listenerParamsAndValue.substring(0, hashPosition),
+ listenerParamsAndValue.substring(hashPosition + 1));
}
}
+ }
- TabularResultData tabularData = ResultBuilder.createTabularResultData();
- boolean accumulatedData = false;
-
- Set<DistributedMember> targetMembers = CliUtil.findMembers(groups, null);
-
- if (targetMembers.isEmpty()) {
- return ResultBuilder.createUserErrorResult(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
- }
-
- AsyncEventQueueFunctionArgs aeqArgs = new AsyncEventQueueFunctionArgs(id, parallel,
- enableBatchConflation, batchSize, batchTimeInterval, persistent, diskStore,
- diskSynchronous, maxQueueMemory, dispatcherThreads, orderPolicy, gatewayEventFilters,
- gatewaySubstitutionListener, listener, listenerProperties, ignoreEvictionAndExpiration);
+ Set<DistributedMember> targetMembers = getMembers(groups, null);
- ResultCollector<?, ?> rc =
- CliUtil.executeFunction(new CreateAsyncEventQueueFunction(), aeqArgs, targetMembers);
+ TabularResultData tabularData = ResultBuilder.createTabularResultData();
+ AsyncEventQueueFunctionArgs aeqArgs = new AsyncEventQueueFunctionArgs(id, parallel,
+ enableBatchConflation, batchSize, batchTimeInterval, persistent, diskStore, diskSynchronous,
+ maxQueueMemory, dispatcherThreads, orderPolicy, gatewayEventFilters,
+ gatewaySubstitutionListener, listener, listenerProperties, forwardExpirationDestroy);
- List<CliFunctionResult> results = CliFunctionResult.cleanResults((List<?>) rc.getResult());
+ CreateAsyncEventQueueFunction function = new CreateAsyncEventQueueFunction();
+ List<CliFunctionResult> results = execute(function, aeqArgs, targetMembers);
- AtomicReference<XmlEntity> xmlEntity = new AtomicReference<>();
- for (CliFunctionResult result : results) {
- if (result.getThrowable() != null) {
- tabularData.accumulate("Member", result.getMemberIdOrName());
- tabularData.accumulate("Result", "ERROR: " + result.getThrowable().getClass().getName()
- + ": " + result.getThrowable().getMessage());
- accumulatedData = true;
- tabularData.setStatus(Result.Status.ERROR);
- } else if (result.isSuccessful()) {
- tabularData.accumulate("Member", result.getMemberIdOrName());
- tabularData.accumulate("Result", result.getMessage());
- accumulatedData = true;
+ if (results.size() == 0) {
+ throw new RuntimeException("No results received.");
+ }
- if (xmlEntity.get() == null) {
- xmlEntity.set(result.getXmlEntity());
- }
+ AtomicReference<XmlEntity> xmlEntity = new AtomicReference<>();
+ for (CliFunctionResult result : results) {
+ if (!result.isSuccessful()) {
+ tabularData.accumulate("Member", result.getMemberIdOrName());
+ tabularData.accumulate("Result", "ERROR: " + result.getErrorMessage());
+ } else {
+ tabularData.accumulate("Member", result.getMemberIdOrName());
+ tabularData.accumulate("Result", result.getMessage());
+
+ // if one member is successful in creating the AEQ and xmlEntity is not set yet,
+ // save the xmlEntity that is to be persisted
+ if (result.isSuccessful() && xmlEntity.get() == null) {
+ xmlEntity.set(result.getXmlEntity());
}
}
-
- if (!accumulatedData) {
- return ResultBuilder.createInfoResult("Unable to create async event queue(s).");
- }
-
- Result result = ResultBuilder.buildResult(tabularData);
- if (xmlEntity.get() != null) {
- persistClusterConfiguration(result,
- () -> getSharedConfiguration().addXmlEntity(xmlEntity.get(), groups));
- }
- return result;
- } catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
- } catch (Throwable th) {
- SystemFailure.checkFailure();
- return ResultBuilder.createGemFireErrorResult(
- CliStrings.format(CliStrings.CREATE_ASYNC_EVENT_QUEUE__ERROR_WHILE_CREATING_REASON_0,
- new Object[] {th.getMessage()}));
}
+ CommandResult commandResult = ResultBuilder.buildResult(tabularData);
+ if (xmlEntity.get() != null) {
+ persistClusterConfiguration(commandResult,
+ () -> getSharedConfiguration().addXmlEntity(xmlEntity.get(), groups));
+ }
+ return commandResult;
+ }
+
+ List<CliFunctionResult> execute(Function function, Object args,
+ Set<DistributedMember> targetMembers) {
+ ResultCollector rc = executeFunction(function, args, targetMembers);
+ return CliFunctionResult.cleanResults((List<?>) rc.getResult());
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java
index 6a30378..f94b6d1 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/GfshCommand.java
@@ -170,12 +170,12 @@ public interface GfshCommand extends CommandMarker {
return CliUtil.getRegionAssociatedMembers(regionPath, cache, true);
}
- default ResultCollector<?, ?> executeFunction(final Function function, Object args,
+ default ResultCollector<?, ?> executeFunction(Function function, Object args,
final Set<DistributedMember> targetMembers) {
return CliUtil.executeFunction(function, args, targetMembers);
}
- default ResultCollector<?, ?> executeFunction(final Function function, Object args,
+ default ResultCollector<?, ?> executeFunction(Function function, Object args,
final DistributedMember targetMember) {
return executeFunction(function, args, Collections.singleton(targetMember));
}
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CliFunctionResult.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CliFunctionResult.java
index 4bbf389..23313e7 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CliFunctionResult.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CliFunctionResult.java
@@ -125,7 +125,7 @@ public class CliFunctionResult implements Comparable<CliFunctionResult>, DataSer
// otherwise use exception's message
if (throwable != null) {
- return throwable.getMessage();
+ return throwable.getClass().getName() + ": " + throwable.getMessage();
}
return null;
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CreateAsyncEventQueueFunction.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CreateAsyncEventQueueFunction.java
index 437e276..12ab783 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CreateAsyncEventQueueFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/CreateAsyncEventQueueFunction.java
@@ -21,7 +21,6 @@ import java.util.Properties;
import joptsimple.internal.Strings;
import org.apache.logging.log4j.Logger;
-import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.Declarable;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
@@ -38,7 +37,6 @@ import org.apache.geode.internal.InternalEntity;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.xmlcache.CacheXml;
import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.management.internal.cli.i18n.CliStrings;
import org.apache.geode.management.internal.configuration.domain.XmlEntity;
/**
@@ -85,21 +83,15 @@ public class CreateAsyncEventQueueFunction extends FunctionAdapter implements In
String[] gatewayEventFilters = aeqArgs.getGatewayEventFilters();
if (gatewayEventFilters != null) {
for (String gatewayEventFilter : gatewayEventFilters) {
- Class<?> gatewayEventFilterKlass =
- forName(gatewayEventFilter, CliStrings.CREATE_ASYNC_EVENT_QUEUE__GATEWAYEVENTFILTER);
asyncEventQueueFactory
- .addGatewayEventFilter((GatewayEventFilter) newInstance(gatewayEventFilterKlass,
- CliStrings.CREATE_ASYNC_EVENT_QUEUE__GATEWAYEVENTFILTER));
+ .addGatewayEventFilter((GatewayEventFilter) newInstance(gatewayEventFilter));
}
}
String gatewaySubstitutionFilter = aeqArgs.getGatewaySubstitutionFilter();
if (gatewaySubstitutionFilter != null) {
- Class<?> gatewayEventSubstitutionFilterKlass = forName(gatewaySubstitutionFilter,
- CliStrings.CREATE_ASYNC_EVENT_QUEUE__SUBSTITUTION_FILTER);
asyncEventQueueFactory.setGatewayEventSubstitutionListener(
- (GatewayEventSubstitutionFilter<?, ?>) newInstance(gatewayEventSubstitutionFilterKlass,
- CliStrings.CREATE_ASYNC_EVENT_QUEUE__SUBSTITUTION_FILTER));
+ (GatewayEventSubstitutionFilter<?, ?>) newInstance(gatewaySubstitutionFilter));
}
String listenerClassName = aeqArgs.getListenerClassName();
@@ -130,48 +122,19 @@ public class CreateAsyncEventQueueFunction extends FunctionAdapter implements In
} catch (CacheClosedException cce) {
context.getResultSender().lastResult(new CliFunctionResult(memberId, false, null));
-
- } catch (VirtualMachineError e) {
- SystemFailure.initiateFailure(e);
- throw e;
-
- } catch (Throwable th) {
- SystemFailure.checkFailure();
- logger.error("Could not create async event queue: {}", th.getMessage(), th);
- context.getResultSender().lastResult(new CliFunctionResult(memberId, th, null));
+ } catch (Exception e) {
+ logger.error("Could not create async event queue: {}", e.getMessage(), e);
+ context.getResultSender().lastResult(new CliFunctionResult(memberId, e, null));
}
}
- private Class<?> forName(String className, String neededFor) {
+ private Object newInstance(String className)
+ throws ClassNotFoundException, IllegalAccessException, InstantiationException {
if (Strings.isNullOrEmpty(className)) {
return null;
}
- try {
- return ClassPathLoader.getLatest().forName(className);
- } catch (ClassNotFoundException e) {
- throw new RuntimeException(CliStrings.format(
- CliStrings.CREATE_ASYNC_EVENT_QUEUE__MSG__COULD_NOT_FIND_CLASS_0_SPECIFIED_FOR_1,
- className, neededFor), e);
- } catch (ClassCastException e) {
- throw new RuntimeException(CliStrings.format(
- CliStrings.CREATE_ASYNC_EVENT_QUEUE__MSG__CLASS_0_SPECIFIED_FOR_1_IS_NOT_OF_EXPECTED_TYPE,
- className, neededFor), e);
- }
- }
-
- private static Object newInstance(Class<?> klass, String neededFor) {
- try {
- return klass.newInstance();
- } catch (InstantiationException e) {
- throw new RuntimeException(CliStrings.format(
- CliStrings.CREATE_ASYNC_EVENT_QUEUE__MSG__COULD_NOT_INSTANTIATE_CLASS_0_SPECIFIED_FOR_1,
- klass, neededFor), e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(CliStrings.format(
- CliStrings.CREATE_ASYNC_EVENT_QUEUE__MSG__COULD_NOT_ACCESS_CLASS_0_SPECIFIED_FOR_1, klass,
- neededFor), e);
- }
+ return ClassPathLoader.getLatest().forName(className).newInstance();
}
@Override
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateAsyncEventQueueCommandDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateAsyncEventQueueCommandDUnitTest.java
new file mode 100644
index 0000000..cb93b2b
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateAsyncEventQueueCommandDUnitTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.cli.commands;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.distributed.internal.ClusterConfigurationService;
+import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
+import org.apache.geode.management.internal.configuration.domain.Configuration;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.rules.LocatorServerStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+
+@Category(DistributedTest.class)
+public class CreateAsyncEventQueueCommandDUnitTest {
+
+
+ public static final String COMMAND = "create async-event-queue ";
+ public static final String VALID_COMMAND =
+ COMMAND + "--listener=" + MyAsyncEventListener.class.getName();
+
+ @Rule
+ public LocatorServerStartupRule lsRule = new LocatorServerStartupRule();
+
+ @Rule
+ public GfshCommandRule gfsh = new GfshCommandRule();
+
+ private static MemberVM locator, server;
+
+ @Test
+ public void createQueueWithInvalidClass() throws Exception {
+ server = lsRule.startServerAsJmxManager(0);
+ gfsh.connectAndVerify(server.getJmxPort(), GfshCommandRule.PortType.jmxManager);
+ IgnoredException.addIgnoredException("java.lang.ClassNotFoundException: xyz");
+ gfsh.executeAndAssertThat(COMMAND + " --id=queue --listener=xyz").statusIsSuccess()
+ .tableHasRowCount("Member", 1).tableHasRowWithValues("Member", "Result", "server-0",
+ "ERROR: java.lang.ClassNotFoundException: xyz");
+ }
+
+ @Test
+ public void createQueueWithoutCC() throws Exception {
+ server = lsRule.startServerAsJmxManager(0);
+ gfsh.connectAndVerify(server.getJmxPort(), GfshCommandRule.PortType.jmxManager);
+ gfsh.executeAndAssertThat(VALID_COMMAND + " --id=queue").statusIsSuccess()
+ .containsOutput("Failed to persist the configuration")
+ .tableHasColumnWithExactValuesInAnyOrder("Result", "Success").tableHasRowCount("Member", 1);
+ }
+
+ @Test
+ public void create_sync_event_queue() throws Exception {
+ locator = lsRule.startLocatorVM(0);
+ lsRule.startServerVM(1, "group1", locator.getPort());
+ lsRule.startServerVM(2, "group2", locator.getPort());
+ gfsh.connectAndVerify(locator);
+ // verify a simple create aeq command
+ gfsh.executeAndAssertThat(VALID_COMMAND + " --id=queue").statusIsSuccess()
+ .tableHasRowCount("Member", 2)
+ .tableHasColumnWithExactValuesInAnyOrder("Result", "Success", "Success");
+
+ IgnoredException
+ .addIgnoredException("java.lang.IllegalStateException: A GatewaySender with id "
+ + "AsyncEventQueue_queue is already defined in this cache.");
+ // create a queue with the same id would result in failure
+ gfsh.executeAndAssertThat(VALID_COMMAND + " --id=queue").statusIsSuccess()
+ .tableHasRowCount("Member", 2).tableHasColumnWithExactValuesInAnyOrder("Result",
+ "ERROR: java.lang.IllegalStateException: A GatewaySender with id AsyncEventQueue_queue is already defined in this cache.",
+ "ERROR: java.lang.IllegalStateException: A GatewaySender with id AsyncEventQueue_queue is already defined in this cache.");
+
+ gfsh.executeAndAssertThat("create disk-store --name=diskStore2 --dir=diskstore");
+ locator.waitTillDiskstoreIsReady("diskStore2", 2);
+
+ // create another queue with different configuration
+ gfsh.executeAndAssertThat(VALID_COMMAND + " --id=queue2 --group=group2 "
+ + "--batch-size=1024 --persistent --disk-store=diskStore2 "
+ + "--max-queue-memory=512 --listener-param=param1,param2#value2").statusIsSuccess()
+ .tableHasRowCount("Member", 1);
+
+
+ // list the queue to verify the result
+ gfsh.executeAndAssertThat("list async-event-queue").statusIsSuccess()
+ .tableHasRowCount("Member", 3).tableHasRowWithValues("Member", "ID", "Batch Size",
+ "Persistent", "Disk Store", "Max Memory", "server-2", "queue2", "1024", "true",
+ "diskStore2", "512");
+ }
+
+ @Test
+ public void create_queue_updates_cc() throws Exception {
+ locator = lsRule.startLocatorVM(0);
+ server = lsRule.startServerVM(1, locator.getPort());
+ gfsh.connectAndVerify(locator);
+
+ locator.invoke(() -> {
+ ClusterConfigurationService service =
+ LocatorServerStartupRule.getLocator().getSharedConfiguration();
+ assertThat(service.getConfiguration("cluster").getCacheXmlContent()).isNull();
+ });
+
+ gfsh.executeAndAssertThat(VALID_COMMAND + " --id=queue").statusIsSuccess()
+ .tableHasRowCount("Member", 1).tableHasColumnWithExactValuesInAnyOrder("Result", "Success");
+
+ locator.invoke(() -> {
+ ClusterConfigurationService service =
+ LocatorServerStartupRule.getLocator().getSharedConfiguration();
+ Configuration configuration = service.getConfiguration("cluster");
+ configuration.getCacheXmlContent().contains("id=queue");
+ });
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateAsyncEventQueueCommandTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateAsyncEventQueueCommandTest.java
new file mode 100644
index 0000000..3fd59af
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateAsyncEventQueueCommandTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.cli.commands;
+
+import static io.codearte.catchexception.shade.mockito.Matchers.any;
+import static org.apache.geode.management.internal.cli.i18n.CliStrings.CREATE_ASYNC_EVENT_QUEUE__BATCHTIMEINTERVAL;
+import static org.apache.geode.management.internal.cli.i18n.CliStrings.CREATE_ASYNC_EVENT_QUEUE__BATCH_SIZE;
+import static org.apache.geode.management.internal.cli.i18n.CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISKSYNCHRONOUS;
+import static org.apache.geode.management.internal.cli.i18n.CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISPATCHERTHREADS;
+import static org.apache.geode.management.internal.cli.i18n.CliStrings.CREATE_ASYNC_EVENT_QUEUE__ENABLEBATCHCONFLATION;
+import static org.apache.geode.management.internal.cli.i18n.CliStrings.CREATE_ASYNC_EVENT_QUEUE__FORWARD_EXPIRATION_DESTROY;
+import static org.apache.geode.management.internal.cli.i18n.CliStrings.CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY;
+import static org.apache.geode.management.internal.cli.i18n.CliStrings.CREATE_ASYNC_EVENT_QUEUE__ORDERPOLICY;
+import static org.apache.geode.management.internal.cli.i18n.CliStrings.CREATE_ASYNC_EVENT_QUEUE__PARALLEL;
+import static org.apache.geode.management.internal.cli.i18n.CliStrings.CREATE_ASYNC_EVENT_QUEUE__PERSISTENT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.distributed.internal.ClusterConfigurationService;
+import org.apache.geode.management.internal.cli.GfshParseResult;
+import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
+import org.apache.geode.management.internal.configuration.domain.XmlEntity;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.apache.geode.test.junit.rules.GfshParserRule;
+
+
+@Category(UnitTest.class)
+public class CreateAsyncEventQueueCommandTest {
+
+ public static final String COMMAND = "create async-event-queue ";
+ public static final String MINIUM_COMMAND = COMMAND + "--id=id --listener=xyz";
+ @ClassRule
+ public static GfshParserRule gfsh = new GfshParserRule();
+
+ private CreateAsyncEventQueueCommand command;
+ private ClusterConfigurationService service;
+
+ @Before
+ public void before() throws Exception {
+ command = spy(CreateAsyncEventQueueCommand.class);
+ service = mock(ClusterConfigurationService.class);
+ doReturn(service).when(command).getSharedConfiguration();
+ }
+
+ @Test
+ public void mandatoryId() throws Exception {
+ gfsh.executeAndAssertThat(command, COMMAND + "--listener=xyz").statusIsError()
+ .containsOutput("Invalid command");
+ }
+
+ @Test
+ public void mandatoryListener() throws Exception {
+ gfsh.executeAndAssertThat(command, COMMAND + "--id=id").statusIsError()
+ .containsOutput("Invalid command");
+ }
+
+ @Test
+ public void cannotCreateAEQOnOneMember() throws Exception {
+ // AEQ can not be created on one member since it needs to update CC.
+ // This test is to make sure we don't add this option
+ gfsh.executeAndAssertThat(command, COMMAND + "--id=id --listener=xyz --member=xyz")
+ .statusIsError().containsOutput("Invalid command");
+ }
+
+ @Test
+ public void defaultValues() throws Exception {
+ GfshParseResult result = gfsh.parse(MINIUM_COMMAND);
+ assertThat(result.getParamValue(CREATE_ASYNC_EVENT_QUEUE__BATCHTIMEINTERVAL)).isEqualTo(5);
+ assertThat(result.getParamValue(CREATE_ASYNC_EVENT_QUEUE__BATCH_SIZE)).isEqualTo(100);
+ assertThat(result.getParamValue(CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY)).isEqualTo(100);
+ assertThat(result.getParamValue(CREATE_ASYNC_EVENT_QUEUE__DISPATCHERTHREADS)).isEqualTo(1);
+ assertThat(result.getParamValue(CREATE_ASYNC_EVENT_QUEUE__DISKSYNCHRONOUS)).isEqualTo(true);
+ assertThat(result.getParamValue(CREATE_ASYNC_EVENT_QUEUE__PERSISTENT)).isEqualTo(false);
+ assertThat(result.getParamValue(CREATE_ASYNC_EVENT_QUEUE__ENABLEBATCHCONFLATION))
+ .isEqualTo(false);
+ assertThat(result.getParamValue(CREATE_ASYNC_EVENT_QUEUE__PARALLEL)).isEqualTo(false);
+ assertThat(result.getParamValue(CREATE_ASYNC_EVENT_QUEUE__FORWARD_EXPIRATION_DESTROY))
+ .isEqualTo(false);
+ assertThat(result.getParamValue(CREATE_ASYNC_EVENT_QUEUE__ORDERPOLICY)).isEqualTo("KEY");
+
+ result = gfsh.parse(COMMAND + "--id=id --listener=xyz --forward-expiration-destroy");
+ assertThat(result.getParamValue(CREATE_ASYNC_EVENT_QUEUE__FORWARD_EXPIRATION_DESTROY))
+ .isEqualTo(true);
+ }
+
+ @Test
+ public void noMemberFound() throws Exception {
+ doReturn(Collections.emptySet()).when(command).findMembers(any(), any());
+ gfsh.executeAndAssertThat(command, MINIUM_COMMAND).statusIsError()
+ .containsOutput("No Members Found");
+ }
+
+ @Test
+ public void buildResult_all_success() throws Exception {
+ List<CliFunctionResult> functionResults = new ArrayList<>();
+ XmlEntity xmlEntity = mock(XmlEntity.class);
+ functionResults.add(new CliFunctionResult("member1", xmlEntity, "SUCCESS"));
+ functionResults.add(new CliFunctionResult("member2", xmlEntity, "SUCCESS"));
+
+ // this is only to make the code pass that member check
+ doReturn(Collections.emptySet()).when(command).getMembers(any(), any());
+ doReturn(functionResults).when(command).execute(isA(Function.class), isA(Object.class),
+ isA(Set.class));
+
+ gfsh.executeAndAssertThat(command, MINIUM_COMMAND).statusIsSuccess().persisted()
+ .tableHasRowCount("Member", 2)
+ .tableHasRowWithValues("Member", "Result", "member1", "SUCCESS")
+ .tableHasRowWithValues("Member", "Result", "member2", "SUCCESS");
+
+ // addXmlEntity should only be called once
+ verify(service).addXmlEntity(xmlEntity, null);
+ }
+
+
+ @Test
+ public void buildResult_all_failure() throws Exception {
+ List<CliFunctionResult> functionResults = new ArrayList<>();
+ XmlEntity xmlEntity = mock(XmlEntity.class);
+ functionResults.add(new CliFunctionResult("member1", false, "failed"));
+ functionResults
+ .add(new CliFunctionResult("member2", new RuntimeException("exception happened"), null));
+
+ // this is only to make the code pass that member check
+ doReturn(Collections.emptySet()).when(command).getMembers(any(), any());
+ doReturn(functionResults).when(command).execute(isA(Function.class), isA(Object.class),
+ isA(Set.class));
+
+ gfsh.executeAndAssertThat(command, MINIUM_COMMAND).statusIsSuccess().persisted() // need to make
+ // sure
+ // failToPersist
+ // flag is not
+ // set, so that
+ // we won't
+ // print out
+ // warning
+ // messages.
+ .tableHasRowCount("Member", 2)
+ .tableHasRowWithValues("Member", "Result", "member1", "ERROR: failed")
+ .tableHasRowWithValues("Member", "Result", "member2",
+ "ERROR: java.lang.RuntimeException: exception happened");
+
+ // addXmlEntity should not be called
+ verify(service, times(0)).addXmlEntity(xmlEntity, null);
+ }
+
+ @Test
+ public void buildResult_one_failure_one_success() throws Exception {
+ List<CliFunctionResult> functionResults = new ArrayList<>();
+ XmlEntity xmlEntity = mock(XmlEntity.class);
+ functionResults.add(new CliFunctionResult("member1", xmlEntity, "SUCCESS"));
+ functionResults
+ .add(new CliFunctionResult("member2", new RuntimeException("exception happened"), null));
+
+ // this is only to make the code pass that member check
+ doReturn(Collections.emptySet()).when(command).getMembers(any(), any());
+ doReturn(functionResults).when(command).execute(isA(Function.class), isA(Object.class),
+ isA(Set.class));
+
+ gfsh.executeAndAssertThat(command, MINIUM_COMMAND).statusIsSuccess().persisted()
+ .tableHasRowCount("Member", 2)
+ .tableHasRowWithValues("Member", "Result", "member1", "SUCCESS").tableHasRowWithValues(
+ "Member", "Result", "member2", "ERROR: java.lang.RuntimeException: exception happened");
+
+ // addXmlEntity should be called once
+ verify(service).addXmlEntity(xmlEntity, null);
+ }
+
+ @Test
+ public void command_succeeded_but_no_cluster_config_service() throws Exception {
+ doReturn(null).when(command).getSharedConfiguration();
+ doReturn(Collections.emptySet()).when(command).getMembers(any(), any());
+
+ List<CliFunctionResult> functionResults = new ArrayList<>();
+ XmlEntity xmlEntity = mock(XmlEntity.class);
+ functionResults.add(new CliFunctionResult("member1", xmlEntity, "SUCCESS"));
+ doReturn(functionResults).when(command).execute(isA(Function.class), isA(Object.class),
+ isA(Set.class));
+
+ gfsh.executeAndAssertThat(command, MINIUM_COMMAND).statusIsSuccess().failToPersist();
+
+ // addXmlEntity should not be called
+ verify(service, times(0)).addXmlEntity(xmlEntity, null);
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DestroyGatewaySenderCommandTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DestroyGatewaySenderCommandTest.java
index 78726ee..d331c17 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DestroyGatewaySenderCommandTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/DestroyGatewaySenderCommandTest.java
@@ -102,7 +102,8 @@ public class DestroyGatewaySenderCommandTest {
doReturn(mock(Set.class)).when(command).getMembers(any(), any());
parser.executeAndAssertThat(command, "destroy gateway-sender --id=1").statusIsError()
- .tableHasColumnWithValuesContaining("Status", "result1", "ERROR: something happened");
+ .tableHasColumnWithValuesContaining("Status", "result1",
+ "ERROR: java.lang.Exception: something happened");
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ListAsyncEventQueuesCommandDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ListAsyncEventQueuesCommandDUnitTest.java
new file mode 100644
index 0000000..1065848
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/ListAsyncEventQueuesCommandDUnitTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.cli.commands;
+
+import static org.apache.geode.test.junit.rules.GfshCommandRule.PortType.jmxManager;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
+import org.apache.geode.test.dunit.rules.LocatorServerStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+
+@Category(UnitTest.class)
+public class ListAsyncEventQueuesCommandDUnitTest {
+
+ @ClassRule
+ public static LocatorServerStartupRule lsRule = new LocatorServerStartupRule();
+
+ @Rule
+ public GfshCommandRule gfsh = new GfshCommandRule(locator::getJmxPort, jmxManager);
+
+ private static MemberVM locator;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ locator = lsRule.startLocatorVM(0);
+ lsRule.startServerVM(1, "group1", locator.getPort());
+ lsRule.startServerVM(2, "group2", locator.getPort());
+ }
+
+ @Test
+ public void list() throws Exception {
+ gfsh.executeAndAssertThat("list async-event-queue").statusIsSuccess()
+ .containsOutput("No Async Event Queues Found");
+
+ gfsh.executeAndAssertThat("create async-event-queue --id=queue1 --group=group1 --listener="
+ + MyAsyncEventListener.class.getName()).statusIsSuccess();
+
+ gfsh.executeAndAssertThat("create async-event-queue --id=queue2 --group=group2 --listener="
+ + MyAsyncEventListener.class.getName()).statusIsSuccess();
+
+ locator.waitTillAsyncEventQueuesAreReadyOnServers("queue1", 1);
+ locator.waitTillAsyncEventQueuesAreReadyOnServers("queue2", 1);
+
+ gfsh.executeAndAssertThat("list async-event-queue").statusIsSuccess()
+ .tableHasRowCount("Member", 2).tableHasRowWithValues("Member", "ID", "server-1", "queue1")
+ .tableHasRowWithValues("Member", "ID", "server-2", "queue2");
+
+ // create another async event queue on the entire cluster, verify that the command will list all
+ gfsh.executeAndAssertThat(
+ "create async-event-queue --id=queue --listener=" + MyAsyncEventListener.class.getName())
+ .statusIsSuccess();
+
+ gfsh.executeAndAssertThat("list async-event-queue").statusIsSuccess()
+ .tableHasRowCount("Member", 4).tableHasRowWithValues("Member", "ID", "server-1", "queue1")
+ .tableHasRowWithValues("Member", "ID", "server-2", "queue2")
+ .tableHasRowWithValues("Member", "ID", "server-1", "queue")
+ .tableHasRowWithValues("Member", "ID", "server-2", "queue");
+
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/QueueCommandsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/QueueCommandsDUnitTest.java
deleted file mode 100644
index 70991c1..0000000
--- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/QueueCommandsDUnitTest.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.management.internal.cli.commands;
-
-import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION;
-import static org.apache.geode.distributed.ConfigurationProperties.GROUPS;
-import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT;
-import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER;
-import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_BIND_ADDRESS;
-import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT;
-import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START;
-import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static org.apache.geode.distributed.ConfigurationProperties.NAME;
-import static org.apache.geode.distributed.ConfigurationProperties.USE_CLUSTER_CONFIGURATION;
-import static org.apache.geode.test.dunit.Assert.assertEquals;
-import static org.apache.geode.test.dunit.Assert.assertFalse;
-import static org.apache.geode.test.dunit.Assert.assertNotNull;
-import static org.apache.geode.test.dunit.Assert.assertTrue;
-import static org.apache.geode.test.dunit.Assert.fail;
-import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter;
-import static org.apache.geode.test.dunit.Wait.waitForCriterion;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.file.Files;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
-import org.apache.geode.distributed.Locator;
-import org.apache.geode.distributed.internal.ClusterConfigurationService;
-import org.apache.geode.distributed.internal.InternalLocator;
-import org.apache.geode.internal.AvailablePort;
-import org.apache.geode.internal.AvailablePortHelper;
-import org.apache.geode.management.cli.Result;
-import org.apache.geode.management.internal.cli.i18n.CliStrings;
-import org.apache.geode.management.internal.cli.result.CommandResult;
-import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
-import org.apache.geode.test.compiler.ClassBuilder;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.SerializableRunnable;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.WaitCriterion;
-import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
-
-/**
- * A distributed test suite of test cases for testing the queue commands that are part of Gfsh.
- *
- * @since GemFire 8.0
- */
-@Category({DistributedTest.class, FlakyTest.class}) // GEODE-1429 GEODE-1976 GEODE-3530
-@SuppressWarnings("serial")
-public class QueueCommandsDUnitTest extends CliCommandTestBase {
-
- private final List<String> filesToBeDeleted = new CopyOnWriteArrayList<>();
-
- @Override
- public final void preSetUp() throws Exception {
- disconnectAllFromDS();
- }
-
- @Test // FlakyTest: GEODE-1429
- public void testAsyncEventQueue() throws IOException {
- final String queue1Name = "testAsyncEventQueue1";
- final String queue2Name = "testAsyncEventQueue2";
- final String diskStoreName = "testAsyncEventQueueDiskStore";
-
- Properties localProps = new Properties();
- localProps.setProperty(GROUPS, "Group0");
- setUpJmxManagerOnVm0ThenConnect(localProps);
-
- CommandResult cmdResult = executeCommand(CliStrings.LIST_ASYNC_EVENT_QUEUES);
- assertEquals(Result.Status.OK, cmdResult.getStatus());
- assertTrue(commandResultToString(cmdResult).contains("No Async Event Queues Found"));
-
- final VM vm1 = Host.getHost(0).getVM(1);
- final String vm1Name = "VM" + vm1.getId();
- final File diskStoreDir = new File(new File(".").getAbsolutePath(), diskStoreName);
- this.filesToBeDeleted.add(diskStoreDir.getAbsolutePath());
- vm1.invoke(new SerializableRunnable() {
- public void run() {
- diskStoreDir.mkdirs();
-
- Properties localProps = new Properties();
- localProps.setProperty(NAME, vm1Name);
- localProps.setProperty(GROUPS, "Group1");
- getSystem(localProps);
- getCache();
- }
- });
-
- final VM vm2 = Host.getHost(0).getVM(2);
- final String vm2Name = "VM" + vm2.getId();
- vm2.invoke(new SerializableRunnable() {
- public void run() {
- Properties localProps = new Properties();
- localProps.setProperty(NAME, vm2Name);
- localProps.setProperty(GROUPS, "Group2");
- getSystem(localProps);
- getCache();
- }
- });
-
- // Deploy a JAR file with an
- // AsyncEventListener/GatewayEventFilter/GatewayEventSubstitutionFilter
- // that can be instantiated on each server
- final File jarFile = new File(new File(".").getAbsolutePath(), "QueueCommandsDUnit.jar");
- QueueCommandsDUnitTest.this.filesToBeDeleted.add(jarFile.getAbsolutePath());
-
- ClassBuilder classBuilder = new ClassBuilder();
- byte[] jarBytes =
- classBuilder.createJarFromClassContent("com/qcdunit/QueueCommandsDUnitTestHelper",
- "package com.qcdunit;" + "import java.util.List; import java.util.Properties;"
- + "import org.apache.geode.internal.cache.xmlcache.Declarable2; import org.apache.geode.cache.asyncqueue.AsyncEvent;"
- + "import org.apache.geode.cache.wan.GatewayEventFilter; import org.apache.geode.cache.wan.GatewayEventSubstitutionFilter;"
- + "import org.apache.geode.cache.asyncqueue.AsyncEventListener; import org.apache.geode.cache.wan.GatewayQueueEvent;"
- + "import org.apache.geode.cache.EntryEvent;"
- + "public class QueueCommandsDUnitTestHelper implements Declarable2, GatewayEventFilter, GatewayEventSubstitutionFilter, AsyncEventListener {"
- + "Properties props;"
- + "public boolean processEvents(List<AsyncEvent> events) { return true; }"
- + "public void afterAcknowledgement(GatewayQueueEvent event) {}"
- + "public boolean beforeEnqueue(GatewayQueueEvent event) { return true; }"
- + "public boolean beforeTransmit(GatewayQueueEvent event) { return true; }"
- + "public Object getSubstituteValue(EntryEvent event) { return null; }"
- + "public void close() {}"
- + "public void init(final Properties props) {this.props = props;}"
- + "public Properties getConfig() {return this.props;}}");
- writeJarBytesToFile(jarFile, jarBytes);
-
- cmdResult = executeCommand("deploy --jar=QueueCommandsDUnit.jar");
- assertEquals(Result.Status.OK, cmdResult.getStatus());
-
- CommandStringBuilder commandStringBuilder =
- new CommandStringBuilder(CliStrings.CREATE_DISK_STORE);
- commandStringBuilder.addOption(CliStrings.CREATE_DISK_STORE__NAME, diskStoreName);
- commandStringBuilder.addOption(CliStrings.GROUP, "Group1");
- commandStringBuilder.addOption(CliStrings.CREATE_DISK_STORE__DIRECTORY_AND_SIZE,
- diskStoreDir.getAbsolutePath());
- cmdResult = executeCommand(commandStringBuilder.toString());
- assertEquals(Result.Status.OK, cmdResult.getStatus());
- String stringResult = commandResultToString(cmdResult);
- assertEquals(3, countLinesInString(stringResult, false));
- assertEquals(false, stringResult.contains("ERROR"));
- assertTrue(stringContainsLine(stringResult, vm1Name + ".*Success"));
-
- commandStringBuilder = new CommandStringBuilder(CliStrings.CREATE_ASYNC_EVENT_QUEUE);
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__ID, queue1Name);
- commandStringBuilder.addOption(CliStrings.GROUP, "Group1");
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__BATCH_SIZE, "514");
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__PERSISTENT, "true");
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISK_STORE, diskStoreName);
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY,
- "213");
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__BATCHTIMEINTERVAL, "946");
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__PARALLEL, "true");
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__ENABLEBATCHCONFLATION,
- "true");
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISPATCHERTHREADS, "2");
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__ORDERPOLICY, "PARTITION");
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__GATEWAYEVENTFILTER,
- "com.qcdunit.QueueCommandsDUnitTestHelper");
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__SUBSTITUTION_FILTER,
- "com.qcdunit.QueueCommandsDUnitTestHelper");
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__DISKSYNCHRONOUS, "false");
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__FORWARD_EXPIRATION_DESTROY,
- "true");
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__LISTENER,
- "com.qcdunit.QueueCommandsDUnitTestHelper");
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__LISTENER_PARAM_AND_VALUE,
- "param1,param2#value2");
- cmdResult = executeCommand(commandStringBuilder.toString());
- assertEquals(Result.Status.OK, cmdResult.getStatus());
- stringResult = commandResultToString(cmdResult);
- assertEquals(3, countLinesInString(stringResult, false));
- assertEquals(false, stringResult.contains("ERROR"));
- assertTrue(stringContainsLine(stringResult, vm1Name + ".*Success"));
-
- // Verify that the queue was created on the correct member
- cmdResult = executeCommand(CliStrings.LIST_ASYNC_EVENT_QUEUES);
- assertEquals(Result.Status.OK, cmdResult.getStatus());
- stringResult = commandResultToString(cmdResult);
- assertEquals(3, countLinesInString(stringResult, false));
- assertTrue(stringContainsLine(stringResult, vm1Name + " .*" + queue1Name + " .*514 .*true .*"
- + diskStoreName + " .*213 .*" + " .*com.qcdunit.QueueCommandsDUnitTestHelper" + ".*"));
- assertTrue(stringContainsLine(stringResult, vm1Name + ".*param2=value2.*"));
- assertTrue(stringContainsLine(stringResult, vm1Name + ".*param1=[^\\w].*"));
- assertFalse(stringContainsLine(stringResult, vm2Name + ".*" + queue1Name + ".*"));
-
- vm1.invoke(new SerializableRunnable() {
- public void run() {
- Cache cache = getCache();
- AsyncEventQueue queue = cache.getAsyncEventQueue(queue1Name);
- assertEquals(queue.getBatchSize(), 514);
- assertEquals(queue.isPersistent(), true);
- assertEquals(queue.getDiskStoreName(), diskStoreName);
- assertEquals(queue.getMaximumQueueMemory(), 213);
- assertEquals(queue.getBatchTimeInterval(), 946);
- assertEquals(queue.isParallel(), true);
- assertEquals(queue.isBatchConflationEnabled(), true);
- assertEquals(queue.getDispatcherThreads(), 2);
- assertEquals(queue.getOrderPolicy().toString(), "PARTITION");
- assertEquals(queue.getGatewayEventFilters().size(), 1);
- assertEquals(queue.getGatewayEventFilters().get(0).getClass().getName(),
- "com.qcdunit.QueueCommandsDUnitTestHelper");
- assertEquals(queue.getGatewayEventSubstitutionFilter().getClass().getName(),
- "com.qcdunit.QueueCommandsDUnitTestHelper");
- assertEquals(queue.isDiskSynchronous(), false);
- assertEquals(queue.isForwardExpirationDestroy(), true);
- assertEquals(queue.getAsyncEventListener().getClass().getName(),
- "com.qcdunit.QueueCommandsDUnitTestHelper");
- }
- });
-
- commandStringBuilder = new CommandStringBuilder(CliStrings.CREATE_ASYNC_EVENT_QUEUE);
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__ID, queue2Name);
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__LISTENER,
- "com.qcdunit.QueueCommandsDUnitTestHelper");
- cmdResult = executeCommand(commandStringBuilder.toString());
- assertEquals(Result.Status.OK, cmdResult.getStatus());
- stringResult = commandResultToString(cmdResult);
- assertEquals(5, countLinesInString(stringResult, false));
- assertTrue(stringContainsLine(stringResult, "Manager.*Success"));
- assertTrue(stringContainsLine(stringResult, vm2Name + ".*Success"));
- assertTrue(stringContainsLine(stringResult, vm1Name + ".*Success"));
-
- // Verify that the queue was created on the correct members
- cmdResult = executeCommand(CliStrings.LIST_ASYNC_EVENT_QUEUES);
- assertEquals(Result.Status.OK, cmdResult.getStatus());
- stringResult = commandResultToString(cmdResult);
- assertEquals(6, countLinesInString(stringResult, false));
- assertTrue(stringContainsLine(stringResult, "Manager .*" + queue2Name
- + " .*100 .*false .*null .*100 .*" + " .*com.qcdunit.QueueCommandsDUnitTestHelper"));
- assertTrue(stringContainsLine(stringResult, vm1Name + " .*" + queue1Name + " .*514 .*true .*"
- + diskStoreName + " .*213 .*" + " .*com.qcdunit.QueueCommandsDUnitTestHelper" + ".*"));
- assertTrue(stringContainsLine(stringResult, vm1Name + " .*" + queue2Name
- + " .*100 .*false .*null .*100 .*" + " .*com.qcdunit.QueueCommandsDUnitTestHelper"));
- assertTrue(stringContainsLine(stringResult, vm2Name + " .*" + queue2Name
- + " .*100 .*false .*null .*100 .*" + " .*com.qcdunit.QueueCommandsDUnitTestHelper"));
- }
-
- /**
- * Asserts that creating async event queues correctly updates the shared configuration.
- */
- @Test // FlakyTest: GEODE-1976
- public void testCreateUpdatesSharedConfig() throws IOException {
- disconnectAllFromDS();
- final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2);
- jmxPort = ports[0];
- httpPort = ports[1];
- try {
- jmxHost = InetAddress.getLocalHost().getHostName();
- } catch (UnknownHostException ignore) {
- jmxHost = "localhost";
- }
-
- final String queueName = "testAsyncEventQueueQueue";
- final String groupName = "testAsyncEventQueueSharedConfigGroup";
-
- final Properties locatorProps = new Properties();
- locatorProps.setProperty(NAME, "Locator");
- locatorProps.setProperty(MCAST_PORT, "0");
- locatorProps.setProperty(LOG_LEVEL, "fine");
- locatorProps.setProperty(ENABLE_CLUSTER_CONFIGURATION, "true");
- locatorProps.setProperty(JMX_MANAGER, "true");
- locatorProps.setProperty(JMX_MANAGER_START, "true");
- locatorProps.setProperty(JMX_MANAGER_BIND_ADDRESS, String.valueOf(jmxHost));
- locatorProps.setProperty(JMX_MANAGER_PORT, String.valueOf(jmxPort));
- locatorProps.setProperty(HTTP_SERVICE_PORT, String.valueOf(httpPort));
-
- // Start the Locator and wait for shared configuration to be available
- final int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
- Host.getHost(0).getVM(0).invoke(new SerializableRunnable() {
- @Override
- public void run() {
- final File locatorLogFile = new File("locator-" + locatorPort + ".log");
-
- try {
- final InternalLocator locator = (InternalLocator) Locator.startLocatorAndDS(locatorPort,
- locatorLogFile, null, locatorProps);
-
- WaitCriterion wc = new WaitCriterion() {
- @Override
- public boolean done() {
- return locator.isSharedConfigurationRunning();
- }
-
- @Override
- public String description() {
- return "Waiting for shared configuration to be started";
- }
- };
- waitForCriterion(wc, 5000, 500, true);
- } catch (IOException ioex) {
- fail("Unable to create a locator with a shared configuration");
- }
- }
- });
-
- connect(jmxHost, jmxPort, httpPort, getDefaultShell());
-
- // Create a cache in VM 1
- VM vm = Host.getHost(0).getVM(1);
- vm.invoke(new SerializableRunnable() {
- @Override
- public void run() {
- Properties localProps = new Properties();
- localProps.setProperty(MCAST_PORT, "0");
- localProps.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
- localProps.setProperty(GROUPS, groupName);
- getSystem(localProps);
- assertNotNull(getCache());
- }
- });
-
- // Deploy a JAR file with an AsyncEventListener that can be instantiated on each server
- final File jarFile = new File(new File(".").getAbsolutePath(), "QueueCommandsDUnit.jar");
- QueueCommandsDUnitTest.this.filesToBeDeleted.add(jarFile.getAbsolutePath());
-
- ClassBuilder classBuilder = new ClassBuilder();
- byte[] jarBytes =
- classBuilder.createJarFromClassContent("com/qcdunit/QueueCommandsDUnitTestListener",
- "package com.qcdunit;" + "import java.util.List; import java.util.Properties;"
- + "import org.apache.geode.internal.cache.xmlcache.Declarable2; import org.apache.geode.cache.asyncqueue.AsyncEvent;"
- + "import org.apache.geode.cache.asyncqueue.AsyncEventListener;"
- + "public class QueueCommandsDUnitTestListener implements Declarable2, AsyncEventListener {"
- + "Properties props;"
- + "public boolean processEvents(List<AsyncEvent> events) { return true; }"
- + "public void close() {}"
- + "public void init(final Properties props) {this.props = props;}"
- + "public Properties getConfig() {return this.props;}}");
- writeJarBytesToFile(jarFile, jarBytes);
-
- CommandResult cmdResult = executeCommand("deploy --jar=QueueCommandsDUnit.jar");
- assertEquals(Result.Status.OK, cmdResult.getStatus());
-
- // Test creating the queue
- CommandStringBuilder commandStringBuilder =
- new CommandStringBuilder(CliStrings.CREATE_ASYNC_EVENT_QUEUE);
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__ID, queueName);
- commandStringBuilder.addOption(CliStrings.GROUP, groupName);
- commandStringBuilder.addOption(CliStrings.CREATE_ASYNC_EVENT_QUEUE__LISTENER,
- "com.qcdunit.QueueCommandsDUnitTestListener");
- cmdResult = executeCommand(commandStringBuilder.toString());
- assertEquals(Result.Status.OK, cmdResult.getStatus());
-
- // Make sure the queue exists in the shared config
- Host.getHost(0).getVM(0).invoke(new SerializableRunnable() {
- @Override
- public void run() {
- ClusterConfigurationService sharedConfig =
- ((InternalLocator) Locator.getLocator()).getSharedConfiguration();
- String xmlFromConfig;
- try {
- xmlFromConfig = sharedConfig.getConfiguration(groupName).getCacheXmlContent();
- assertTrue(xmlFromConfig.contains(queueName));
- } catch (Exception e) {
- fail("Error occurred in cluster configuration service", e);
- }
- }
- });
-
- // Close cache in the vm1 and restart it to get the shared configuration
- vm = Host.getHost(0).getVM(1);
- vm.invoke(new SerializableRunnable() {
- @Override
- public void run() {
- Cache cache = getCache();
- assertNotNull(cache);
- cache.close();
-
- assertTrue(cache.isClosed());
-
- Properties localProps = new Properties();
- localProps.setProperty(MCAST_PORT, "0");
- localProps.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
- localProps.setProperty(GROUPS, groupName);
- localProps.setProperty(USE_CLUSTER_CONFIGURATION, "true");
- getSystem(localProps);
- cache = getCache();
- assertNotNull(cache);
- AsyncEventQueue aeq = cache.getAsyncEventQueue(queueName);
-
- assertNotNull(aeq);
- }
- });
- }
-
- @Override
- protected final void preTearDownCliCommandTestBase() throws Exception {
- for (String path : this.filesToBeDeleted) {
- try {
- final File fileToDelete = new File(path);
- if (fileToDelete.isDirectory())
- FileUtils.deleteDirectory(fileToDelete);
- else
- Files.delete(fileToDelete.toPath());
- if (path.endsWith(".jar")) {
- executeCommand("undeploy --jar=" + fileToDelete.getName());
- }
- } catch (IOException e) {
- getLogWriter().error("Unable to delete file", e);
- }
- }
- this.filesToBeDeleted.clear();
- }
-
- private void writeJarBytesToFile(File jarFile, byte[] jarBytes) throws IOException {
- final OutputStream outStream = new FileOutputStream(jarFile);
- outStream.write(jarBytes);
- outStream.close();
- }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/functions/CliFunctionResultTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/functions/CliFunctionResultTest.java
index 2f0946b..4a4cd91 100644
--- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/functions/CliFunctionResultTest.java
+++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/functions/CliFunctionResultTest.java
@@ -37,6 +37,6 @@ public class CliFunctionResultTest {
assertThat(result.getErrorMessage()).isEqualTo("message");
result = new CliFunctionResult("memberName", new Exception("exception message"), null);
- assertThat(result.getErrorMessage()).isEqualTo("exception message");
+ assertThat(result.getErrorMessage()).isEqualTo("java.lang.Exception: exception message");
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/test/junit/assertions/CommandResultAssert.java b/geode-core/src/test/java/org/apache/geode/test/junit/assertions/CommandResultAssert.java
index 4b0a43b..382b65c 100644
--- a/geode-core/src/test/java/org/apache/geode/test/junit/assertions/CommandResultAssert.java
+++ b/geode-core/src/test/java/org/apache/geode/test/junit/assertions/CommandResultAssert.java
@@ -101,6 +101,16 @@ public class CommandResultAssert
return this;
}
+ public CommandResultAssert failToPersist() {
+ Assertions.assertThat(actual.getCommandResult().failedToPersist()).isTrue();
+ return this;
+ }
+
+ public CommandResultAssert persisted() {
+ Assertions.assertThat(actual.getCommandResult().failedToPersist()).isFalse();
+ return this;
+ }
+
/**
* Verifies that gfsh executed with status ERROR
*/
@@ -195,7 +205,7 @@ public class CommandResultAssert
for (int rowIndex = 0; rowIndex < numberOfRows; rowIndex++) {
Object[] rowValues = new Object[headers.length];
for (int columnIndex = 0; columnIndex < headers.length; columnIndex++) {
- rowValues[columnIndex] = allValues.get(headers[columnIndex]).get(rowIndex);
+ rowValues[columnIndex] = allValues.get(headers[columnIndex]).get(rowIndex).toString();
}
// check if entire row is equal, but if not, continue to next row
@@ -205,7 +215,7 @@ public class CommandResultAssert
}
// did not find any matching rows, then this would pass only if we do not pass in any values
- assertThat(headersThenValues.length).isEqualTo(0);
+ assertThat(headersThenValues.length).describedAs("No matching row found.").isEqualTo(0);
return this;
}
diff --git a/geode-core/src/test/java/org/apache/geode/test/junit/rules/GfshParserRule.java b/geode-core/src/test/java/org/apache/geode/test/junit/rules/GfshParserRule.java
index a6e181d..3507479 100644
--- a/geode-core/src/test/java/org/apache/geode/test/junit/rules/GfshParserRule.java
+++ b/geode-core/src/test/java/org/apache/geode/test/junit/rules/GfshParserRule.java
@@ -53,6 +53,9 @@ public class GfshParserRule extends ExternalResource {
return parser.parse(command);
}
+ /**
+ * @deprecated use executeAndAssertThat instead
+ */
public <T> CommandResult executeCommandWithInstance(T instance, String command) {
GfshParseResult parseResult = parse(command);
diff --git a/geode-web/src/test/java/org/apache/geode/management/internal/cli/commands/CommandOverHttpDUnitTest.java b/geode-web/src/test/java/org/apache/geode/management/internal/cli/commands/CommandOverHttpDUnitTest.java
index 4844e35..99f0b10 100644
--- a/geode-web/src/test/java/org/apache/geode/management/internal/cli/commands/CommandOverHttpDUnitTest.java
+++ b/geode-web/src/test/java/org/apache/geode/management/internal/cli/commands/CommandOverHttpDUnitTest.java
@@ -29,8 +29,8 @@ import org.apache.geode.test.junit.runners.SuiteRunner;
*/
@Category({DistributedTest.class, SecurityTest.class})
@RunWith(SuiteRunner.class)
-@Suite.SuiteClasses({GemfireDataCommandsDUnitTest.class, QueueCommandsDUnitTest.class,
- ShellCommandsDUnitTest.class, ShowStackTraceDUnitTest.class})
+@Suite.SuiteClasses({GemfireDataCommandsDUnitTest.class, ShellCommandsDUnitTest.class,
+ ShowStackTraceDUnitTest.class})
public class CommandOverHttpDUnitTest {
@ClassRule
public static ProvideSystemProperty provideSystemProperty =
--
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].