You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2018/04/12 15:42:52 UTC
[geode] branch develop updated: GEODE-4919: Update the PRConfig
(#1666)
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new bdbc0e3 GEODE-4919: Update the PRConfig (#1666)
bdbc0e3 is described below
commit bdbc0e32b5e7cc7ed4fc916f27ef02f3d0da6568
Author: Nabarun Nag <na...@users.noreply.github.com>
AuthorDate: Thu Apr 12 08:42:46 2018 -0700
GEODE-4919: Update the PRConfig (#1666)
* Before adding any gateway sender or AEQ now do the following
* Assign buckets to the partition region
* Update the PartitionRegionConfig for the region with the updated set of gateway senders
* For Lucene reindex AEQs will be created at this point.
* Mutators then add the gateway senders to the internal region attributes.
---
.../internal/cache/PartitionRegionConfig.java | 4 +
.../geode/internal/cache/PartitionedRegion.java | 54 ++-
.../cli/functions/RegionAlterFunction.java | 37 +-
.../cache/lucene/internal/LuceneServiceImpl.java | 3 +-
.../cache/lucene/LuceneIndexCreationDUnitTest.java | 14 +-
.../WANClusterConfigurationDUnitTest.java | 392 +++++++++++++++++++++
6 files changed, 492 insertions(+), 12 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionRegionConfig.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionRegionConfig.java
index a2cc208..f531190 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionRegionConfig.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionRegionConfig.java
@@ -89,6 +89,10 @@ public class PartitionRegionConfig extends ExternalizableDSFID implements Versio
private ArrayList<String> partitionListenerClassNames = new ArrayList<String>();
+ public void setGatewaySenderIds(Set<String> gatewaySenderIds) {
+ this.gatewaySenderIds = Collections.unmodifiableSet(gatewaySenderIds);
+ }
+
private Set<String> gatewaySenderIds = Collections.emptySet();
/**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 51de306..455087c 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.internal.cache;
+import static java.util.stream.Collectors.toSet;
import static org.apache.geode.internal.lang.SystemUtils.getLineSeparator;
import java.io.IOException;
@@ -1225,6 +1226,27 @@ public class PartitionedRegion extends LocalRegion
}
}
+ public void updatePRConfigWithNewSetOfGatewaySenders(Set<String> gatewaySendersToAdd) {
+ PartitionRegionHelper.assignBucketsToPartitions(this);
+ PartitionRegionConfig prConfig = this.prRoot.get(getRegionIdentifier());
+ prConfig.setGatewaySenderIds(gatewaySendersToAdd);
+ updatePRConfig(prConfig, false);
+ }
+
+ public void updatePRConfigWithNewGatewaySender(String aeqId) {
+ PartitionRegionHelper.assignBucketsToPartitions(this);
+ PartitionRegionConfig prConfig = this.prRoot.get(getRegionIdentifier());
+ Set<String> newGateWayIds;
+ if (prConfig.getGatewaySenderIds() != null) {
+ newGateWayIds = new HashSet<>(prConfig.getGatewaySenderIds());
+ } else {
+ newGateWayIds = new HashSet<>();
+ }
+ newGateWayIds.add(aeqId);
+ prConfig.setGatewaySenderIds(newGateWayIds);
+ updatePRConfig(prConfig, false);
+ }
+
public void removeGatewaySenderId(String gatewaySenderId) {
super.removeGatewaySenderId(gatewaySenderId);
new UpdateAttributesProcessor(this).distribute();
@@ -1347,7 +1369,7 @@ public class PartitionedRegion extends LocalRegion
prConfig = this.prRoot.get(getRegionIdentifier());
if (prConfig == null) {
- validateParalleGatewaySenderIds();
+ validateParallelGatewaySenderIds();
this.partitionedRegionId = generatePRId(getSystem());
prConfig = new PartitionRegionConfig(this.partitionedRegionId, this.getFullPath(),
prAttribs, this.getScope(), getAttributes().getEvictionAttributes(),
@@ -1454,10 +1476,35 @@ public class PartitionedRegion extends LocalRegion
}
}
- public void validateParalleGatewaySenderIds() throws PRLocallyDestroyedException {
- for (String senderId : this.getParallelGatewaySenderIds()) {
+ public void validateParallelGatewaySenderIds() throws PRLocallyDestroyedException {
+ validateParallelGatewaySenderIds(this.getParallelGatewaySenderIds());
+ }
+
+ /*
+ * filterOutNonParallelGatewaySenders takes in a set of gateway sender IDs and returns
+ * a set of parallel gateway senders present in the input set.
+ */
+ public Set<String> filterOutNonParallelGatewaySenders(Set<String> senderIds) {
+ Set<String> allParallelSenders = cache.getAllGatewaySenders().parallelStream()
+ .filter(GatewaySender::isParallel).map(GatewaySender::getId).collect(toSet());
+ Set<String> parallelSenders = new HashSet<>();
+ senderIds.parallelStream().forEach(gatewaySenderId -> {
+ if (allParallelSenders.contains(gatewaySenderId)) {
+ parallelSenders.add(gatewaySenderId);
+ }
+ });
+ return parallelSenders;
+ }
+
+ public void validateParallelGatewaySenderIds(Set<String> parallelGatewaySenderIds)
+ throws PRLocallyDestroyedException {
+ for (String senderId : parallelGatewaySenderIds) {
for (PartitionRegionConfig config : this.prRoot.values()) {
if (config.getGatewaySenderIds().contains(senderId)) {
+ if (this.getFullPath().equals(config.getFullPath())) {
+ // The sender is already attached to this region
+ continue;
+ }
Map<String, PartitionedRegion> colocationMap =
ColocationHelper.getAllColocationRegions(this);
if (!colocationMap.isEmpty()) {
@@ -9922,7 +9969,6 @@ public class PartitionedRegion extends LocalRegion
Set<Integer> allBuckets = userPR.getDataStore().getAllLocalBucketIds();
Set<Integer> allBucketsClone = new HashSet<Integer>();
allBucketsClone.addAll(allBuckets);
-
while (allBucketsClone.size() != 0) {
logger.debug(
"Need to wait until partitionedRegionQueue <<{}>> is loaded with all the buckets",
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/RegionAlterFunction.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/RegionAlterFunction.java
index 8d113a1..b612944 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/RegionAlterFunction.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/functions/RegionAlterFunction.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.management.internal.cli.functions;
+import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang.exception.ExceptionUtils;
@@ -31,7 +32,9 @@ import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.ResultSender;
import org.apache.geode.internal.ClassPathLoader;
import org.apache.geode.internal.cache.AbstractRegion;
+import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.execute.InternalFunction;
+import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
import org.apache.geode.internal.cache.xmlcache.CacheXml;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.management.internal.cli.CliUtil;
@@ -181,10 +184,29 @@ public class RegionAlterFunction implements InternalFunction {
}
}
- // Alter Gateway Sender Ids
final Set<String> newGatewaySenderIds = regionAlterArgs.getGatewaySenderIds();
- if (newGatewaySenderIds != null) {
+ final Set<String> newAsyncEventQueueIds = regionAlterArgs.getAsyncEventQueueIds();
+
+
+ if (region instanceof PartitionedRegion) {
+ Set<String> senderIds = new HashSet<>();
+ if (newGatewaySenderIds != null) {
+ validateParallelGatewaySenderIDs((PartitionedRegion) region, newGatewaySenderIds);
+ senderIds.addAll(newGatewaySenderIds);
+ } else if (region.getGatewaySenderIds() != null) {
+ senderIds.addAll(region.getAllGatewaySenderIds());
+ }
+ if (newAsyncEventQueueIds != null) {
+ validateParallelGatewaySenderIDs((PartitionedRegion) region, newAsyncEventQueueIds);
+ senderIds.addAll(newAsyncEventQueueIds);
+ } else if (region.getAsyncEventQueueIds() != null) {
+ senderIds.addAll(region.getAsyncEventQueueIds());
+ }
+ ((PartitionedRegion) region).updatePRConfigWithNewSetOfGatewaySenders(senderIds);
+ }
+ // Alter Gateway Sender Ids
+ if (newGatewaySenderIds != null) {
// Remove old gateway sender ids that aren't in the new list
Set<String> oldGatewaySenderIds = region.getGatewaySenderIds();
if (!oldGatewaySenderIds.isEmpty()) {
@@ -208,7 +230,6 @@ public class RegionAlterFunction implements InternalFunction {
}
// Alter Async Queue Ids
- final Set<String> newAsyncEventQueueIds = regionAlterArgs.getAsyncEventQueueIds();
if (newAsyncEventQueueIds != null) {
// Remove old async event queue ids that aren't in the new list
@@ -285,6 +306,16 @@ public class RegionAlterFunction implements InternalFunction {
return region;
}
+ private void validateParallelGatewaySenderIDs(PartitionedRegion region,
+ Set<String> newGatewaySenderIds) {
+ try {
+ Set<String> parallelSenders = region.filterOutNonParallelGatewaySenders(newGatewaySenderIds);
+ region.validateParallelGatewaySenderIds(parallelSenders);
+ } catch (PRLocallyDestroyedException e) {
+ throw new IllegalStateException("Partitioned Region not found registered", e);
+ }
+ }
+
@SuppressWarnings("unchecked")
private static <K> Class<K> forName(String classToLoadName, String neededFor) {
Class<K> loadedClass = null;
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
index ede3449..5756d71 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
@@ -24,8 +24,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.logging.log4j.Logger;
@@ -227,6 +225,7 @@ public class LuceneServiceImpl implements InternalLuceneService {
analyzer, fieldAnalyzers, serializer));
String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath);
+ region.updatePRConfigWithNewGatewaySender(aeqId);
LuceneIndexImpl luceneIndex = beforeDataRegionCreated(indexName, regionPath,
region.getAttributes(), analyzer, fieldAnalyzers, aeqId, serializer, fields);
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationDUnitTest.java
index 8f887a1..0a09934 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationDUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationDUnitTest.java
@@ -57,6 +57,7 @@ import org.apache.geode.cache.lucene.internal.repository.serializer.Heterogeneou
import org.apache.geode.cache.lucene.test.LuceneTestUtilities;
import org.apache.geode.cache.lucene.test.TestObject;
import org.apache.geode.internal.i18n.LocalizedStrings;
+import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.junit.categories.DistributedTest;
import org.apache.geode.test.junit.categories.LuceneTest;
@@ -314,7 +315,8 @@ public class LuceneIndexCreationDUnitTest extends LuceneDUnitTest {
@Test
@Parameters({"PARTITION", "PARTITION_REDUNDANT"})
- public void creatingIndexAfterRegionInTwoMembersSucceed(RegionTestableType regionType) {
+ public void creatingIndexAfterRegionInTwoMembersSucceed(RegionTestableType regionType)
+ throws Exception {
dataStore1.invoke(() -> {
regionType.createDataStore(getCache(), REGION_NAME);
});
@@ -323,14 +325,20 @@ public class LuceneIndexCreationDUnitTest extends LuceneDUnitTest {
regionType.createDataStore(getCache(), REGION_NAME);
});
- dataStore1.invoke(() -> {
+ AsyncInvocation createIndex1 = dataStore1.invokeAsync(() -> {
createIndexAfterRegion("field1");
});
- dataStore2.invoke(() -> {
+ AsyncInvocation createIndex2 = dataStore2.invokeAsync(() -> {
createIndexAfterRegion("field1");
});
+ createIndex1.join();
+ createIndex2.join();
+
+ createIndex1.checkException();
+ createIndex2.checkException();
+
dataStore1.invoke(() -> {
putEntryAndQuery();
});
diff --git a/geode-wan/src/test/java/org/apache/geode/management/internal/configuration/WANClusterConfigurationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/management/internal/configuration/WANClusterConfigurationDUnitTest.java
index 8da4f9e..121dc56 100644
--- a/geode-wan/src/test/java/org/apache/geode/management/internal/configuration/WANClusterConfigurationDUnitTest.java
+++ b/geode-wan/src/test/java/org/apache/geode/management/internal/configuration/WANClusterConfigurationDUnitTest.java
@@ -20,8 +20,11 @@ import static org.apache.geode.test.dunit.Assert.assertTrue;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.assertj.core.api.Assertions.assertThat;
+import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -29,7 +32,12 @@ import org.junit.experimental.categories.Category;
import org.apache.geode.cache.wan.GatewayReceiver;
import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.management.ManagementService;
+import org.apache.geode.management.cli.Result;
import org.apache.geode.management.internal.cli.i18n.CliStrings;
+import org.apache.geode.management.internal.cli.result.CommandResult;
+import org.apache.geode.management.internal.cli.result.CompositeResultData;
+import org.apache.geode.management.internal.cli.result.TabularResultData;
import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
@@ -58,6 +66,390 @@ public class WANClusterConfigurationDUnitTest {
}
@Test
+ public void whenAlteringNoncolocatedRegionsWithTheSameParallelSenderIdThenFailure()
+ throws Exception {
+ addIgnoredException("could not get remote locator");
+ addIgnoredException("cannot have the same parallel gateway sender id");
+
+ MemberVM locator = clusterStartupRule.startLocatorVM(0);
+ MemberVM server1 = clusterStartupRule.startServerVM(1, locator.getPort());
+ MemberVM server2 = clusterStartupRule.startServerVM(2, locator.getPort());
+
+ // Connect Gfsh to locator.
+ gfsh.connectAndVerify(locator);
+
+ CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ny");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "true");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, "2");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_REGION);
+ csb.addOption(CliStrings.CREATE_REGION__REGION, "test1");
+ csb.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION_REDUNDANT");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_REGION);
+ csb.addOption(CliStrings.CREATE_REGION__REGION, "test2");
+ csb.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION_REDUNDANT");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.ALTER_REGION);
+ csb.addOption(CliStrings.ALTER_REGION__REGION, "test1");
+ csb.addOption(CliStrings.ALTER_REGION__GATEWAYSENDERID, "ny");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.ALTER_REGION);
+ csb.addOption(CliStrings.ALTER_REGION__REGION, "test2");
+ csb.addOption(CliStrings.ALTER_REGION__GATEWAYSENDERID, "ny");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsError();
+ }
+
+ @Test
+ public void whenAlteringOneRegionsWithDifferentParallelSenderIdThenSuccess() throws Exception {
+ addIgnoredException("could not get remote locator");
+ addIgnoredException("cannot have the same parallel gateway sender id");
+
+ MemberVM locator = clusterStartupRule.startLocatorVM(0);
+ MemberVM server1 = clusterStartupRule.startServerVM(1, locator.getPort());
+ MemberVM server2 = clusterStartupRule.startServerVM(2, locator.getPort());
+
+ // Connect Gfsh to locator.
+ gfsh.connectAndVerify(locator);
+
+ CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ny");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "true");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, "2");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ln");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "true");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, "2");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_REGION);
+ csb.addOption(CliStrings.CREATE_REGION__REGION, "test1");
+ csb.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION_REDUNDANT");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.ALTER_REGION);
+ csb.addOption(CliStrings.ALTER_REGION__REGION, "test1");
+ csb.addOption(CliStrings.ALTER_REGION__GATEWAYSENDERID, "ny,ln");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+ }
+
+ @Test
+ public void whenAlteringOneRegionsWithDifferentParallelSenderIdsAfterItWasAlreadySetWithOneOfTheNewSenderThenSuccess()
+ throws Exception {
+ addIgnoredException("could not get remote locator");
+ addIgnoredException("cannot have the same parallel gateway sender id");
+
+ MemberVM locator = clusterStartupRule.startLocatorVM(0);
+ MemberVM server1 = clusterStartupRule.startServerVM(1, locator.getPort());
+ MemberVM server2 = clusterStartupRule.startServerVM(2, locator.getPort());
+
+ // Connect Gfsh to locator.
+ gfsh.connectAndVerify(locator);
+
+ CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ny");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "true");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, "2");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ln");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "true");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, "2");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_REGION);
+ csb.addOption(CliStrings.CREATE_REGION__REGION, "test1");
+ csb.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION_REDUNDANT");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.ALTER_REGION);
+ csb.addOption(CliStrings.ALTER_REGION__REGION, "test1");
+ csb.addOption(CliStrings.ALTER_REGION__GATEWAYSENDERID, "ny");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.ALTER_REGION);
+ csb.addOption(CliStrings.ALTER_REGION__REGION, "test1");
+ csb.addOption(CliStrings.ALTER_REGION__GATEWAYSENDERID, "ny,ln");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+ }
+
+
+ @Test
+ public void whenAlteringOneRegionsWithDifferentParallelSenderIdAfterItWasAlreadyCreatedWithDifferentSenderThenSuccess()
+ throws Exception {
+ addIgnoredException("could not get remote locator");
+ addIgnoredException("cannot have the same parallel gateway sender id");
+ addIgnoredException("Could not execute \"list gateways\"");
+ MemberVM locator = clusterStartupRule.startLocatorVM(0);
+ MemberVM server1 = clusterStartupRule.startServerVM(1, locator.getPort());
+ MemberVM server2 = clusterStartupRule.startServerVM(2, locator.getPort());
+
+ // Connect Gfsh to locator.
+ gfsh.connectAndVerify(locator);
+
+ CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ny");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "true");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, "2");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ln");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "true");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, "2");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ waitTillAllGatewaySendersAreReady();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_REGION);
+ csb.addOption(CliStrings.CREATE_REGION__REGION, "test1");
+ csb.addOption(CliStrings.CREATE_REGION__GATEWAYSENDERID, "ny");
+ csb.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION_REDUNDANT");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.ALTER_REGION);
+ csb.addOption(CliStrings.ALTER_REGION__REGION, "test1");
+ csb.addOption(CliStrings.ALTER_REGION__GATEWAYSENDERID, "ln");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+ }
+
+ private void waitTillAllGatewaySendersAreReady() {
+ Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> {
+ CommandStringBuilder csb2 = new CommandStringBuilder(CliStrings.LIST_GATEWAY);
+ CommandResult cmdResult = gfsh.executeCommand(csb2.toString());
+ assertThat(cmdResult).isNotNull();
+ assertThat(cmdResult.getStatus()).isSameAs(Result.Status.OK);
+ TabularResultData tableSenderResultData = ((CompositeResultData) cmdResult.getResultData())
+ .retrieveSection(CliStrings.SECTION_GATEWAY_SENDER)
+ .retrieveTable(CliStrings.TABLE_GATEWAY_SENDER);
+ List<String> senders =
+ tableSenderResultData.retrieveAllValues(CliStrings.RESULT_GATEWAY_SENDER_ID);
+ assertThat(senders).hasSize(4);
+ });
+ }
+
+
+ @Test
+ public void whenAlteringOneRegionsWithDifferentParallelSenderIdAfterItWasSetWithOneSenderThenSuccess()
+ throws Exception {
+ addIgnoredException("could not get remote locator");
+ addIgnoredException("cannot have the same parallel gateway sender id");
+
+ MemberVM locator = clusterStartupRule.startLocatorVM(0);
+ MemberVM server1 = clusterStartupRule.startServerVM(1, locator.getPort());
+ MemberVM server2 = clusterStartupRule.startServerVM(2, locator.getPort());
+
+ // Connect Gfsh to locator.
+ gfsh.connectAndVerify(locator);
+
+ CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ny");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "true");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, "2");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ln");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "true");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, "2");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_REGION);
+ csb.addOption(CliStrings.CREATE_REGION__REGION, "test1");
+ csb.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION_REDUNDANT");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.ALTER_REGION);
+ csb.addOption(CliStrings.ALTER_REGION__REGION, "test1");
+ csb.addOption(CliStrings.ALTER_REGION__GATEWAYSENDERID, "ny");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.ALTER_REGION);
+ csb.addOption(CliStrings.ALTER_REGION__REGION, "test1");
+ csb.addOption(CliStrings.ALTER_REGION__GATEWAYSENDERID, "ln");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+ }
+
+ @Test
+ public void whenAlteringNoncolocatedRegionsWithTheSameSerialSenderIdThenSuccess()
+ throws Exception {
+ addIgnoredException("could not get remote locator");
+ addIgnoredException("cannot have the same parallel gateway sender id");
+
+ MemberVM locator = clusterStartupRule.startLocatorVM(0);
+ MemberVM server1 = clusterStartupRule.startServerVM(1, locator.getPort());
+ MemberVM server2 = clusterStartupRule.startServerVM(2, locator.getPort());
+
+ // Connect Gfsh to locator.
+ gfsh.connectAndVerify(locator);
+
+ CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ny");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, "2");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_REGION);
+ csb.addOption(CliStrings.CREATE_REGION__REGION, "test1");
+ csb.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION_REDUNDANT");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_REGION);
+ csb.addOption(CliStrings.CREATE_REGION__REGION, "test2");
+ csb.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION_REDUNDANT");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.ALTER_REGION);
+ csb.addOption(CliStrings.ALTER_REGION__REGION, "test1");
+ csb.addOption(CliStrings.ALTER_REGION__GATEWAYSENDERID, "ny");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.ALTER_REGION);
+ csb.addOption(CliStrings.ALTER_REGION__REGION, "test2");
+ csb.addOption(CliStrings.ALTER_REGION__GATEWAYSENDERID, "ny");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+ }
+
+ @Test
+ public void whenAlteringNoncolocatedRegionsWithDifferentSerialGatewayIDThenSuccess()
+ throws Exception {
+ addIgnoredException("could not get remote locator");
+ addIgnoredException("cannot have the same parallel gateway sender id");
+
+ MemberVM locator = clusterStartupRule.startLocatorVM(0);
+ MemberVM server1 = clusterStartupRule.startServerVM(1, locator.getPort());
+ MemberVM server2 = clusterStartupRule.startServerVM(2, locator.getPort());
+
+ // Connect Gfsh to locator.
+ gfsh.connectAndVerify(locator);
+
+ CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ny");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, "2");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ln");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, "2");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_REGION);
+ csb.addOption(CliStrings.CREATE_REGION__REGION, "test1");
+ csb.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION_REDUNDANT");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_REGION);
+ csb.addOption(CliStrings.CREATE_REGION__REGION, "test2");
+ csb.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION_REDUNDANT");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.ALTER_REGION);
+ csb.addOption(CliStrings.ALTER_REGION__REGION, "test1");
+ csb.addOption(CliStrings.ALTER_REGION__GATEWAYSENDERID, "ny");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.ALTER_REGION);
+ csb.addOption(CliStrings.ALTER_REGION__REGION, "test2");
+ csb.addOption(CliStrings.ALTER_REGION__GATEWAYSENDERID, "ln");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+ }
+
+
+ @Test
+ public void whenAlteringNoncolocatedRegionsWithDifferentParallelGatewayIDThenSuccess()
+ throws Exception {
+ addIgnoredException("could not get remote locator");
+ addIgnoredException("cannot have the same parallel gateway sender id");
+
+ MemberVM locator = clusterStartupRule.startLocatorVM(0);
+ MemberVM server1 = clusterStartupRule.startServerVM(1, locator.getPort());
+ MemberVM server2 = clusterStartupRule.startServerVM(2, locator.getPort());
+
+ // Connect Gfsh to locator.
+ gfsh.connectAndVerify(locator);
+
+ CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ny");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "true");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, "2");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ln");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "true");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, "2");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_REGION);
+ csb.addOption(CliStrings.CREATE_REGION__REGION, "test1");
+ csb.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION_REDUNDANT");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_REGION);
+ csb.addOption(CliStrings.CREATE_REGION__REGION, "test2");
+ csb.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION_REDUNDANT");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.ALTER_REGION);
+ csb.addOption(CliStrings.ALTER_REGION__REGION, "test1");
+ csb.addOption(CliStrings.ALTER_REGION__GATEWAYSENDERID, "ny");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.ALTER_REGION);
+ csb.addOption(CliStrings.ALTER_REGION__REGION, "test2");
+ csb.addOption(CliStrings.ALTER_REGION__GATEWAYSENDERID, "ln");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+ }
+
+ @Test
+ public void whenAlteringColocatedRegionsWithSameParallelGatewayIDThenSuccess() throws Exception {
+ addIgnoredException("could not get remote locator");
+ addIgnoredException("cannot have the same parallel gateway sender id");
+
+ MemberVM locator = clusterStartupRule.startLocatorVM(0);
+ MemberVM server1 = clusterStartupRule.startServerVM(1, locator.getPort());
+ MemberVM server2 = clusterStartupRule.startServerVM(2, locator.getPort());
+
+ // Connect Gfsh to locator.
+ gfsh.connectAndVerify(locator);
+
+ CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER);
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ny");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "true");
+ csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, "2");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_REGION);
+ csb.addOption(CliStrings.CREATE_REGION__REGION, "test1");
+ csb.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION_REDUNDANT");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.CREATE_REGION);
+ csb.addOption(CliStrings.CREATE_REGION__REGION, "test2");
+ csb.addOption(CliStrings.CREATE_REGION__COLOCATEDWITH, "test1");
+ csb.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION_REDUNDANT");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.ALTER_REGION);
+ csb.addOption(CliStrings.ALTER_REGION__REGION, "test1");
+ csb.addOption(CliStrings.ALTER_REGION__GATEWAYSENDERID, "ny");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+
+ csb = new CommandStringBuilder(CliStrings.ALTER_REGION);
+ csb.addOption(CliStrings.ALTER_REGION__REGION, "test2");
+ csb.addOption(CliStrings.ALTER_REGION__GATEWAYSENDERID, "ny");
+ gfsh.executeAndAssertThat(csb.toString()).statusIsSuccess();
+ }
+
+
+ @Test
public void testCreateGatewaySenderReceiver() throws Exception {
addIgnoredException("could not get remote locator");
--
To stop receiving notification emails like this one, please contact
nnag@apache.org.