You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mh...@apache.org on 2020/10/13 17:11:53 UTC
[geode] branch feature/GEODE-7665 updated: GEODE-7845 blocking PR
region clear if one or more server versions are too old (#5577)
This is an automated email from the ASF dual-hosted git repository.
mhanson pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-7665 by this push:
new 55550d3 GEODE-7845 blocking PR region clear if one or more server versions are too old (#5577)
55550d3 is described below
commit 55550d31e2fcf63c22eef9153bdc0ac3ec4ba575
Author: mhansonp <ha...@vmware.com>
AuthorDate: Tue Oct 13 10:10:25 2020 -0700
GEODE-7845 blocking PR region clear if one or more server versions are too old (#5577)
- if a server is running an old version when a PR clear is invoked
by the client, the client will receive a ServerOperationException
with a cause of ServerVersionMismatchException.
---
.../integrationTest/resources/assembly_content.txt | 1 +
.../main/java/org/apache/geode/cache/Region.java | 2 +
.../cache/ServerVersionMismatchException.java | 34 ++
.../geode/internal/cache/PartitionedRegion.java | 1 +
.../internal/cache/PartitionedRegionClear.java | 34 +-
.../sanctioned-geode-core-serializables.txt | 1 +
.../internal/cache/PartitionedRegionClearTest.java | 109 ++++--
.../RollingUpgrade2DUnitTestBase.java | 4 +-
...ionRegionClearMixedServerPartitionedRegion.java | 412 +++++++++++++++++++++
9 files changed, 571 insertions(+), 27 deletions(-)
diff --git a/geode-assembly/src/integrationTest/resources/assembly_content.txt b/geode-assembly/src/integrationTest/resources/assembly_content.txt
index 549150f..553785a 100644
--- a/geode-assembly/src/integrationTest/resources/assembly_content.txt
+++ b/geode-assembly/src/integrationTest/resources/assembly_content.txt
@@ -245,6 +245,7 @@ javadoc/org/apache/geode/cache/RoleEvent.html
javadoc/org/apache/geode/cache/RoleException.html
javadoc/org/apache/geode/cache/Scope.html
javadoc/org/apache/geode/cache/SerializedCacheValue.html
+javadoc/org/apache/geode/cache/ServerVersionMismatchException.html
javadoc/org/apache/geode/cache/StatisticsDisabledException.html
javadoc/org/apache/geode/cache/SubscriptionAttributes.html
javadoc/org/apache/geode/cache/SynchronizationCommitConflictException.html
diff --git a/geode-core/src/main/java/org/apache/geode/cache/Region.java b/geode-core/src/main/java/org/apache/geode/cache/Region.java
index 4707a46..5162bd5 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/Region.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/Region.java
@@ -1307,6 +1307,8 @@ public interface Region<K, V> extends ConcurrentMap<K, V> {
* @throws PartitionedRegionPartialClearException when data is partially cleared on partitioned
* region. It is caller responsibility to handle the partial data clear either by retrying
* the clear operation or continue working with the partially cleared partitioned region.
+ * @throws ServerVersionMismatchException when data was not cleared because one or more
+ * of the member servers' version was too old to understand the clear message.
*/
@Override
void clear();
diff --git a/geode-core/src/main/java/org/apache/geode/cache/ServerVersionMismatchException.java b/geode-core/src/main/java/org/apache/geode/cache/ServerVersionMismatchException.java
new file mode 100644
index 0000000..1d4231a
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/cache/ServerVersionMismatchException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache;
+
+import java.util.List;
+
+/**
+ * Indicates a failure to perform an operation on a Partitioned Region due to
+ * server versions not meeting requirements.
+ *
+ * @since GEODE 1.14.0
+ */
+public class ServerVersionMismatchException extends CacheRuntimeException {
+ private static final long serialVersionUID = -3004093739855972548L;
+
+ public ServerVersionMismatchException(List<String> members, String featureName,
+ String version) {
+ super(
+ "A server's " + members + " version was too old (< " + version + ") for : " + featureName);
+
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index b9572c4..256850b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -5328,6 +5328,7 @@ public class PartitionedRegion extends LocalRegion
return this.totalNumberOfBuckets;
}
+
@Override
public void basicDestroy(final EntryEventImpl event, final boolean cacheWrite,
final Object expectedOldValue)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
index e8b01d8..0e5acfc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClear.java
@@ -14,8 +14,10 @@
*/
package org.apache.geode.internal.cache;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import org.apache.logging.log4j.Logger;
@@ -25,11 +27,13 @@ import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.OperationAbortedException;
import org.apache.geode.cache.PartitionedRegionPartialClearException;
+import org.apache.geode.cache.ServerVersionMismatchException;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.logging.internal.log4j.api.LogService;
public class PartitionedRegionClear {
@@ -289,7 +293,8 @@ public class PartitionedRegionClear {
}
final Set<InternalDistributedMember> configRecipients =
- new HashSet<>(partitionedRegion.getRegionAdvisor().adviseAllPRNodes());
+ new HashSet<>(partitionedRegion.getRegionAdvisor()
+ .adviseAllPRNodes());
try {
final PartitionRegionConfig prConfig =
@@ -310,8 +315,7 @@ public class PartitionedRegionClear {
try {
PartitionedRegionClearMessage.PartitionedRegionClearResponse resp =
new PartitionedRegionClearMessage.PartitionedRegionClearResponse(
- partitionedRegion.getSystem(),
- configRecipients);
+ partitionedRegion.getSystem(), configRecipients);
PartitionedRegionClearMessage partitionedRegionClearMessage =
new PartitionedRegionClearMessage(configRecipients, partitionedRegion, resp, op, event);
partitionedRegionClearMessage.send();
@@ -334,10 +338,34 @@ public class PartitionedRegionClear {
return bucketsOperated;
}
+ /**
+ * This method returns a boolean to indicate if all server versions support Partition Region clear
+ */
+ public void allServerVersionsSupportPartitionRegionClear() {
+ List<String> memberNames = new ArrayList<>();
+ for (int i = 0; i < partitionedRegion.getTotalNumberOfBuckets(); i++) {
+ InternalDistributedMember internalDistributedMember = partitionedRegion.getBucketPrimary(i);
+ if ((internalDistributedMember != null)
+ && (internalDistributedMember.getVersion().isOlderThan(KnownVersion.GEODE_1_14_0))) {
+ if (!memberNames.contains(internalDistributedMember.getName())) {
+ memberNames.add(internalDistributedMember.getName());
+ logger.info("MLH adding " + internalDistributedMember.getName());
+ }
+ }
+ }
+ if (!memberNames.isEmpty()) {
+ throw new ServerVersionMismatchException(memberNames, "Partitioned Region Clear",
+ KnownVersion.GEODE_1_14_0.toString());
+ }
+ }
+
+
void doClear(RegionEventImpl regionEvent, boolean cacheWrite) {
String lockName = CLEAR_OPERATION + partitionedRegion.getName();
long clearStartTime = 0;
+ allServerVersionsSupportPartitionRegionClear();
+
try {
// distributed lock to make sure only one clear op is in progress in the cluster.
acquireDistributedClearLock(lockName);
diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
index 644fbc2..86e2372 100644
--- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
+++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
@@ -92,6 +92,7 @@ org/apache/geode/cache/ResourceException,true,-5559328592343363268
org/apache/geode/cache/ResumptionAction,true,6632254151314915610,ordinal:byte
org/apache/geode/cache/RoleException,true,-7521056108445887394
org/apache/geode/cache/Scope,true,5534399159504301602,ordinal:int
+org/apache/geode/cache/ServerVersionMismatchException,true,-3004093739855972548
org/apache/geode/cache/StatisticsDisabledException,true,-2987721454129719551
org/apache/geode/cache/SynchronizationCommitConflictException,true,2619806460255259492
org/apache/geode/cache/TimeoutException,true,-6260761691185737442
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java
index bd37d9e..bd78fd0 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionClearTest.java
@@ -37,6 +37,7 @@ import org.mockito.ArgumentCaptor;
import org.apache.geode.CancelCriterion;
import org.apache.geode.cache.PartitionedRegionPartialClearException;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.ServerVersionMismatchException;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionManager;
@@ -44,6 +45,7 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
+import org.apache.geode.internal.serialization.KnownVersion;
public class PartitionedRegionClearTest {
@@ -51,6 +53,8 @@ public class PartitionedRegionClearTest {
private PartitionedRegionClear partitionedRegionClear;
private DistributionManager distributionManager;
private PartitionedRegion partitionedRegion;
+ private RegionAdvisor regionAdvisor;
+ private InternalDistributedMember internalDistributedMember;
@Before
public void setUp() {
@@ -62,6 +66,14 @@ public class PartitionedRegionClearTest {
when(partitionedRegion.getName()).thenReturn("prRegion");
partitionedRegionClear = new PartitionedRegionClear(partitionedRegion);
+ internalDistributedMember = mock(InternalDistributedMember.class);
+ when(internalDistributedMember.getVersion()).thenReturn(KnownVersion.CURRENT);
+ regionAdvisor = mock(RegionAdvisor.class);
+ when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor);
+ when(regionAdvisor.getDistributionManager()).thenReturn(distributionManager);
+ when(distributionManager.getDistributionManagerId()).thenReturn(internalDistributedMember);
+ when(distributionManager.getId()).thenReturn(internalDistributedMember);
+
}
private Set<BucketRegion> setupBucketRegions(
@@ -85,7 +97,6 @@ public class PartitionedRegionClearTest {
@Test
public void isLockedForListenerAndClientNotificationReturnsTrueWhenLocked() {
- InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
when(distributionManager.isCurrentMember(internalDistributedMember)).thenReturn(true);
partitionedRegionClear.obtainClearLockLocal(internalDistributedMember);
@@ -94,7 +105,6 @@ public class PartitionedRegionClearTest {
@Test
public void isLockedForListenerAndClientNotificationReturnsFalseWhenMemberNotInTheSystemRequestsLock() {
- InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
when(distributionManager.isCurrentMember(internalDistributedMember)).thenReturn(false);
assertThat(partitionedRegionClear.isLockedForListenerAndClientNotification()).isFalse();
@@ -132,8 +142,6 @@ public class PartitionedRegionClearTest {
doReturn(Collections.EMPTY_SET).when(spyPartitionedRegionClear)
.attemptToSendPartitionedRegionClearMessage(regionEvent,
PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR);
- InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
- when(distributionManager.getId()).thenReturn(internalDistributedMember);
spyPartitionedRegionClear.obtainLockForClear(regionEvent);
@@ -152,8 +160,6 @@ public class PartitionedRegionClearTest {
doReturn(Collections.EMPTY_SET).when(spyPartitionedRegionClear)
.attemptToSendPartitionedRegionClearMessage(regionEvent,
PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR);
- InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
- when(distributionManager.getId()).thenReturn(internalDistributedMember);
spyPartitionedRegionClear.releaseLockForClear(regionEvent);
@@ -172,8 +178,6 @@ public class PartitionedRegionClearTest {
doReturn(Collections.EMPTY_SET).when(spyPartitionedRegionClear)
.attemptToSendPartitionedRegionClearMessage(regionEvent,
PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR);
- InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
- when(distributionManager.getId()).thenReturn(internalDistributedMember);
spyPartitionedRegionClear.clearRegion(regionEvent);
@@ -330,13 +334,12 @@ public class PartitionedRegionClearTest {
PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
- InternalDistributedMember member = mock(InternalDistributedMember.class);
- when(distributionManager.isCurrentMember(member)).thenReturn(true);
+ when(distributionManager.isCurrentMember(internalDistributedMember)).thenReturn(true);
- partitionedRegionClear.obtainClearLockLocal(member);
+ partitionedRegionClear.obtainClearLockLocal(internalDistributedMember);
assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester())
- .isSameAs(member);
+ .isSameAs(internalDistributedMember);
for (BucketRegion bucketRegion : buckets) {
verify(bucketRegion, times(1)).lockLocallyForClear(partitionedRegion.getDistributionManager(),
partitionedRegion.getMyId(), null);
@@ -350,10 +353,9 @@ public class PartitionedRegionClearTest {
PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
- InternalDistributedMember member = mock(InternalDistributedMember.class);
- when(distributionManager.isCurrentMember(member)).thenReturn(false);
+ when(distributionManager.isCurrentMember(internalDistributedMember)).thenReturn(false);
- partitionedRegionClear.obtainClearLockLocal(member);
+ partitionedRegionClear.obtainClearLockLocal(internalDistributedMember);
assertThat(partitionedRegionClear.lockForListenerAndClientNotification.getLockRequester())
.isNull();
@@ -370,9 +372,9 @@ public class PartitionedRegionClearTest {
PartitionedRegionDataStore partitionedRegionDataStore = mock(PartitionedRegionDataStore.class);
Set<BucketRegion> buckets = setupBucketRegions(partitionedRegionDataStore, bucketAdvisor);
when(partitionedRegion.getDataStore()).thenReturn(partitionedRegionDataStore);
- InternalDistributedMember member = mock(InternalDistributedMember.class);
- when(distributionManager.isCurrentMember(member)).thenReturn(true);
- partitionedRegionClear.lockForListenerAndClientNotification.setLocked(member);
+ when(distributionManager.isCurrentMember(internalDistributedMember)).thenReturn(true);
+ partitionedRegionClear.lockForListenerAndClientNotification
+ .setLocked(internalDistributedMember);
partitionedRegionClear.releaseClearLockLocal();
@@ -405,13 +407,11 @@ public class PartitionedRegionClearTest {
Region<String, PartitionRegionConfig> prRoot = mock(Region.class);
when(partitionedRegion.getPRRoot()).thenReturn(prRoot);
InternalDistributedMember member = mock(InternalDistributedMember.class);
- RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
Set<InternalDistributedMember> prNodes = Collections.singleton(member);
Node node = mock(Node.class);
when(node.getMemberId()).thenReturn(member);
Set<Node> configNodes = Collections.singleton(node);
when(regionAdvisor.adviseAllPRNodes()).thenReturn(prNodes);
- when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor);
PartitionRegionConfig partitionRegionConfig = mock(PartitionRegionConfig.class);
when(partitionRegionConfig.getNodes()).thenReturn(configNodes);
when(prRoot.get(any())).thenReturn(partitionRegionConfig);
@@ -423,7 +423,7 @@ public class PartitionedRegionClearTest {
when(txManager.isDistributed()).thenReturn(false);
when(internalCache.getTxManager()).thenReturn(txManager);
when(partitionedRegion.getCache()).thenReturn(internalCache);
-
+ when(member.getVersion()).thenReturn(KnownVersion.getCurrentVersion());
when(distributionManager.getCancelCriterion()).thenReturn(mock(CancelCriterion.class));
when(distributionManager.getStats()).thenReturn(mock(DMStats.class));
@@ -433,6 +433,8 @@ public class PartitionedRegionClearTest {
verify(distributionManager, times(1)).putOutgoing(any());
}
+
+
@Test
public void doClearAcquiresAndReleasesDistributedClearLockAndCreatesAllPrimaryBuckets() {
RegionEventImpl regionEvent = mock(RegionEventImpl.class);
@@ -458,7 +460,6 @@ public class PartitionedRegionClearTest {
doReturn(Collections.EMPTY_SET).when(spyPartitionedRegionClear).clearRegion(regionEvent);
spyPartitionedRegionClear.doClear(regionEvent, cacheWrite);
-
verify(spyPartitionedRegionClear, times(1)).invokeCacheWriter(regionEvent);
}
@@ -558,6 +559,70 @@ public class PartitionedRegionClearTest {
}
@Test
+ public void doClearThrowsServerVersionMismatchException() {
+ boolean cacheWrite = false;
+ RegionEventImpl regionEvent = mock(RegionEventImpl.class);
+ when(partitionedRegion.hasListener()).thenReturn(false);
+ when(partitionedRegion.hasAnyClientsInterested()).thenReturn(false);
+ when(partitionedRegion.getTotalNumberOfBuckets()).thenReturn(2);
+ when(partitionedRegion.getName()).thenReturn("prRegion");
+ PartitionedRegionClear spyPartitionedRegionClear = spy(partitionedRegionClear);
+ doNothing().when(spyPartitionedRegionClear).acquireDistributedClearLock(any());
+ doNothing().when(spyPartitionedRegionClear).assignAllPrimaryBuckets();
+ doNothing().when(spyPartitionedRegionClear).obtainLockForClear(regionEvent);
+ doNothing().when(spyPartitionedRegionClear).releaseLockForClear(regionEvent);
+ doReturn(Collections.singleton("2")).when(spyPartitionedRegionClear).clearRegion(regionEvent);
+
+ when(regionEvent.clone()).thenReturn(mock(RegionEventImpl.class));
+ Region<String, PartitionRegionConfig> prRoot = mock(Region.class);
+ when(partitionedRegion.getPRRoot()).thenReturn(prRoot);
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ InternalDistributedMember oldMember = mock(InternalDistributedMember.class);
+ Set<InternalDistributedMember> prNodes = new HashSet<>();
+ prNodes.add(member);
+ prNodes.add(oldMember);
+ Node node = mock(Node.class);
+ Node oldNode = mock(Node.class);
+ when(member.getName()).thenReturn("member");
+ when(oldMember.getName()).thenReturn("oldMember");
+ when(node.getMemberId()).thenReturn(member);
+ when(oldNode.getMemberId()).thenReturn(oldMember);
+ Set<Node> configNodes = new HashSet<>();
+ configNodes.add(node);
+ configNodes.add(oldNode);
+ when(partitionedRegion.getBucketPrimary(0)).thenReturn(member);
+ when(partitionedRegion.getBucketPrimary(1)).thenReturn(oldMember);
+
+ when(regionAdvisor.adviseAllPRNodes()).thenReturn(prNodes);
+ PartitionRegionConfig partitionRegionConfig = mock(PartitionRegionConfig.class);
+ when(partitionRegionConfig.getNodes()).thenReturn(configNodes);
+ when(prRoot.get(any())).thenReturn(partitionRegionConfig);
+ InternalDistributedSystem internalDistributedSystem = mock(InternalDistributedSystem.class);
+ when(internalDistributedSystem.getDistributionManager()).thenReturn(distributionManager);
+ when(partitionedRegion.getSystem()).thenReturn(internalDistributedSystem);
+ InternalCache internalCache = mock(InternalCache.class);
+ TXManagerImpl txManager = mock(TXManagerImpl.class);
+ when(txManager.isDistributed()).thenReturn(false);
+ when(internalCache.getTxManager()).thenReturn(txManager);
+ when(partitionedRegion.getCache()).thenReturn(internalCache);
+ when(oldMember.getVersion()).thenReturn(KnownVersion.GEODE_1_11_0);
+ when(member.getVersion()).thenReturn(KnownVersion.getCurrentVersion());
+ when(distributionManager.getCancelCriterion()).thenReturn(mock(CancelCriterion.class));
+ when(distributionManager.getStats()).thenReturn(mock(DMStats.class));
+
+
+ Throwable thrown =
+ catchThrowable(() -> spyPartitionedRegionClear.doClear(regionEvent, cacheWrite));
+
+ assertThat(thrown)
+ .isInstanceOf(ServerVersionMismatchException.class)
+ .hasMessage(
+ "A server's [oldMember] version was too old (< GEODE 1.14.0) for : Partitioned Region Clear");
+ }
+
+
+
+ @Test
public void handleClearFromDepartedMemberReleasesTheLockForRequesterDeparture() {
InternalDistributedMember member = mock(InternalDistributedMember.class);
partitionedRegionClear.lockForListenerAndClientNotification.setLocked(member);
diff --git a/geode-core/src/upgradeTest/java/org/apache/geode/internal/cache/rollingupgrade/RollingUpgrade2DUnitTestBase.java b/geode-core/src/upgradeTest/java/org/apache/geode/internal/cache/rollingupgrade/RollingUpgrade2DUnitTestBase.java
index 293bc69..6181e56 100755
--- a/geode-core/src/upgradeTest/java/org/apache/geode/internal/cache/rollingupgrade/RollingUpgrade2DUnitTestBase.java
+++ b/geode-core/src/upgradeTest/java/org/apache/geode/internal/cache/rollingupgrade/RollingUpgrade2DUnitTestBase.java
@@ -987,7 +987,7 @@ public abstract class RollingUpgrade2DUnitTestBase extends JUnit4DistributedTest
return clientCache;
}
- private static boolean assertRegionExists(GemFireCache cache, String regionName) {
+ protected static boolean assertRegionExists(GemFireCache cache, String regionName) {
Region region = cache.getRegion(regionName);
if (region == null) {
throw new Error("Region: " + regionName + " does not exist");
@@ -995,7 +995,7 @@ public abstract class RollingUpgrade2DUnitTestBase extends JUnit4DistributedTest
return true;
}
- private static Region getRegion(GemFireCache cache, String regionName) {
+ protected static Region getRegion(GemFireCache cache, String regionName) {
return cache.getRegion(regionName);
}
diff --git a/geode-core/src/upgradeTest/java/org/apache/geode/internal/cache/rollingupgrade/RollingUpgradePartitionRegionClearMixedServerPartitionedRegion.java b/geode-core/src/upgradeTest/java/org/apache/geode/internal/cache/rollingupgrade/RollingUpgradePartitionRegionClearMixedServerPartitionedRegion.java
new file mode 100644
index 0000000..bfcd651
--- /dev/null
+++ b/geode-core/src/upgradeTest/java/org/apache/geode/internal/cache/rollingupgrade/RollingUpgradePartitionRegionClearMixedServerPartitionedRegion.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.rollingupgrade;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runners.Parameterized.UseParametersRunnerFactory;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.GemFireCache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.ServerVersionMismatchException;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.ServerOperationException;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.DistributedTestUtils;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.Invoke;
+import org.apache.geode.test.dunit.NetworkUtils;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.internal.DUnitLauncher;
+import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
+import org.apache.geode.test.version.VersionManager;
+
+@RunWith(Parameterized.class)
+@UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
+public class RollingUpgradePartitionRegionClearMixedServerPartitionedRegion
+ extends JUnit4DistributedTestCase {
+
+ protected static final Logger logger = LogService.getLogger();
+ protected static GemFireCache cache;
+ protected static ClientCache clientcache;
+
+ @Parameter
+ public String oldVersion;
+
+ @Parameters(name = "from_v{0}")
+ public static Collection<String> data() {
+ List<String> result = VersionManager.getInstance().getVersionsWithoutCurrent();
+ if (result.size() < 1) {
+ throw new RuntimeException("No older versions of Geode were found to test against");
+ } else {
+ System.out.println("running against these versions: " + result);
+ }
+ return result;
+ }
+
+ @Test
+ public void testPutAndGetMixedServerPartitionedRegion() throws Exception {
+ doTestPutAndGetMixedServers(oldVersion);
+ }
+
+ /**
+ * This test starts up multiple servers from the current code base and multiple servers from the
+ * old version and executes puts and gets on a new server and old server and verifies that the
+ * results are present. Note that the puts have overlapping region keys just to test new puts and
+ * replaces
+ */
+ void doTestPutAndGetMixedServers(String oldVersion)
+ throws Exception {
+ VM currentServer1 = VM.getVM(VersionManager.CURRENT_VERSION, 0);
+ VM oldServerAndLocator = VM.getVM(oldVersion, 1);
+ VM currentServer2 = VM.getVM(VersionManager.CURRENT_VERSION, 2);
+ VM oldServer2 = VM.getVM(oldVersion, 3);
+
+ String regionName = "aRegion";
+
+ final String serverHostName = NetworkUtils.getServerHostName();
+ final int port = AvailablePortHelper.getRandomAvailableTCPPort();
+ oldServerAndLocator.invoke(() -> DistributedTestUtils.deleteLocatorStateFile(port));
+ try {
+ final Properties props = getSystemProperties();
+ props.remove(DistributionConfig.LOCATORS_NAME);
+
+ // Fire up the locator and server
+ oldServerAndLocator.invoke(() -> {
+ props.put(DistributionConfig.START_LOCATOR_NAME,
+ "" + serverHostName + "[" + port + "]");
+ props.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ cache = createCache(props);
+ Thread.sleep(5000); // bug in 1.0 - cluster config service not immediately available
+ });
+
+ props.put(DistributionConfig.LOCATORS_NAME, serverHostName + "[" + port + "]");
+
+ // create the cache in all the server VMs.
+ for (VM vm : Arrays.asList(oldServer2, currentServer1, currentServer2)) {
+ vm.invoke(() -> {
+ cache = createCache(props);
+ });
+ }
+ // spin up current version servers
+ for (VM vm : Arrays.asList(currentServer1, currentServer2)) {
+ vm.invoke(
+ () -> assertVersion(cache, VersionManager.getInstance().getCurrentVersionOrdinal()));
+ }
+
+ // create region
+ for (VM vm : Arrays.asList(currentServer1, currentServer2, oldServerAndLocator, oldServer2)) {
+ vm.invoke(() -> createRegion(cache, regionName));
+ }
+
+ // put some data in the region to make sure there is something to clear.
+ putDataSerializableAndVerify(currentServer1, regionName, currentServer2, oldServerAndLocator,
+ oldServer2);
+
+ // invoke Partition Region Clear and verify we didn't touch the old servers.
+
+ currentServer1.invoke(() -> {
+ assertRegionExists(cache, regionName);
+ PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionName);
+
+ Throwable thrown = catchThrowable(region::clear);
+ assertThat(thrown).isInstanceOf(ServerVersionMismatchException.class);
+
+ });
+ } finally {
+ for (VM vm : Arrays.asList(currentServer1, currentServer2, oldServerAndLocator, oldServer2)) {
+ vm.invoke(
+ () -> closeCache(RollingUpgradePartitionRegionClearMixedServerPartitionedRegion.cache));
+ }
+ }
+ }
+
+ @Test
+ public void TestClientServerGetsUnsupportedExceptionWhenPRClearInvoked() throws Exception {
+ doTestClientServerGetsUnsupportedExceptionWhenPRClearInvoked(oldVersion);
+ }
+
+ void doTestClientServerGetsUnsupportedExceptionWhenPRClearInvoked(String oldVersion)
+ throws Exception {
+
+ VM client = VM.getVM(VersionManager.CURRENT_VERSION, 0);
+ VM locator = VM.getVM(VersionManager.CURRENT_VERSION, 1);
+ VM currentServer = VM.getVM(VersionManager.CURRENT_VERSION, 2);
+ VM oldServer2 = VM.getVM(oldVersion, 3);
+
+ for (VM vm : Arrays.asList(locator, currentServer, client)) {
+ vm.invoke(() -> System.setProperty("gemfire.allow_old_members_to_join_for_testing", "true"));
+ }
+
+ String regionName = "aRegion";
+
+ final String serverHostName = NetworkUtils.getServerHostName();
+ final int port = AvailablePortHelper.getRandomAvailableTCPPort();
+ locator.invoke(() -> DistributedTestUtils.deleteLocatorStateFile(port));
+ try {
+ final Properties props = getSystemProperties();
+ props.remove(DistributionConfig.LOCATORS_NAME);
+
+ // Fire up the locator and server
+ locator.invoke(() -> {
+ props.put(DistributionConfig.START_LOCATOR_NAME,
+ "" + serverHostName + "[" + port + "]");
+ props.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ cache = createCache(props);
+ });
+
+ props.put(DistributionConfig.LOCATORS_NAME, serverHostName + "[" + port + "]");
+
+ // create the cache in all the server VMs.
+ for (VM vm : Arrays.asList(oldServer2, currentServer)) {
+ vm.invoke(() -> {
+ props.setProperty(DistributionConfig.NAME_NAME, "vm" + VM.getVMId());
+ cache = createCache(props);
+ });
+ }
+ int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+
+ oldServer2.invoke(() -> startCacheServer(cache, ports[0]));
+ currentServer.invoke(() -> startCacheServer(cache, ports[1]));
+
+ // create region
+ for (VM vm : Arrays.asList(currentServer, locator, oldServer2)) {
+ vm.invoke(() -> createRegion(cache, regionName));
+ }
+
+ // put some data in the region to make sure there is something to clear.
+ putDataSerializableAndVerify(currentServer, regionName, locator, oldServer2);
+
+ // invoke Partition Region Clear from the client and verify the exception.
+ client.invoke(() -> {
+ clientcache = new ClientCacheFactory().addPoolServer(serverHostName, ports[1]).create();
+ Region<Object, Object> clientRegion = clientcache.createClientRegionFactory(
+ ClientRegionShortcut.PROXY).create(regionName);
+
+ clientRegion.put("key", "value");
+
+ Throwable thrown = catchThrowable(clientRegion::clear);
+ assertThat(thrown).isInstanceOf(ServerOperationException.class);
+ assertThat(thrown).hasCauseInstanceOf(ServerVersionMismatchException.class);
+ ServerVersionMismatchException serverVersionMismatchException =
+ (ServerVersionMismatchException) thrown.getCause();
+ assertThat(serverVersionMismatchException.getMessage()).contains("vm3");
+ });
+
+ } finally {
+
+ for (VM vm : Arrays.asList(currentServer, locator, oldServer2)) {
+ vm.invoke(() -> closeCache(cache));
+ }
+
+ client.invoke(() -> {
+ if (cache != null && !clientcache.isClosed()) {
+ clientcache.close(false);
+ }
+ });
+ }
+ }
+
+ private String getLocatorString(int locatorPort) {
+ return getDUnitLocatorAddress() + "[" + locatorPort + "]";
+ }
+
+ public String getLocatorString(int[] locatorPorts) {
+ StringBuilder locatorString = new StringBuilder();
+ int numLocators = locatorPorts.length;
+ for (int i = 0; i < numLocators; i++) {
+ locatorString.append(getLocatorString(locatorPorts[i]));
+ if (i + 1 < numLocators) {
+ locatorString.append(",");
+ }
+ }
+ return locatorString.toString();
+ }
+
+ private Cache createCache(Properties systemProperties) {
+ systemProperties.setProperty(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
+ if (VersionManager.getInstance().getCurrentVersionOrdinal() < 75) {
+ systemProperties.remove("validate-serializable-objects");
+ systemProperties.remove("serializable-object-filter");
+ }
+ CacheFactory cf = new CacheFactory(systemProperties);
+ return cf.create();
+ }
+
+ private void startCacheServer(GemFireCache cache, int port) throws Exception {
+ CacheServer cacheServer = ((GemFireCacheImpl) cache).addCacheServer();
+ cacheServer.setPort(port);
+ cacheServer.start();
+ }
+
+ protected void assertRegionExists(GemFireCache cache, String regionName) {
+ Region<Object, Object> region = cache.getRegion(regionName);
+ if (region == null) {
+ throw new Error("Region: " + regionName + " does not exist");
+ }
+ }
+
+ private void assertEntryExists(GemFireCache cache, String regionName) {
+ assertRegionExists(cache, regionName);
+ Region<Object, Object> region = cache.getRegion(regionName);
+ for (int i = 0; i < 10; i++) {
+ String key = "" + i;
+ Object regionValue = region.get(key);
+ assertThat(regionValue).describedAs("Entry for key:" + key + " does not exist").isNotNull();
+ }
+ }
+
+ public void put(GemFireCache cache, String regionName, Object key, Object value) {
+ Region<Object, Object> region = cache.getRegion(regionName);
+ System.out.println(regionName + ".put(" + key + "," + value + ")");
+ Object result = region.put(key, value);
+ System.out.println("returned " + result);
+ }
+
+ private void createRegion(GemFireCache cache, String regionName) {
+ RegionFactory<Object, Object> rf = ((GemFireCacheImpl) cache).createRegionFactory(
+ RegionShortcut.PARTITION);
+ System.out.println("created region " + rf.create(regionName));
+ }
+
+ void assertVersion(GemFireCache cache, short ordinal) {
+ DistributedSystem system = cache.getDistributedSystem();
+ int thisOrdinal =
+ ((InternalDistributedMember) system.getDistributedMember()).getVersion()
+ .ordinal();
+ if (ordinal != thisOrdinal) {
+ throw new Error(
+ "Version ordinal:" + thisOrdinal + " was not the expected ordinal of:" + ordinal);
+ }
+ }
+
+ private void closeCache(GemFireCache cache) {
+ if (cache == null) {
+ return;
+ }
+ boolean cacheClosed = cache.isClosed();
+ if (!cacheClosed) {
+ List<CacheServer> servers = ((Cache) cache).getCacheServers();
+ for (CacheServer server : servers) {
+ server.stop();
+ }
+ cache.close();
+ }
+ }
+
+ /**
+ * Get the port that the standard dunit locator is listening on.
+ *
+ */
+ private String getDUnitLocatorAddress() {
+ return Host.getHost(0).getHostName();
+ }
+
+ private void deleteVMFiles() {
+ System.out.println("deleting files in vm" + VM.getVMId());
+ File pwd = new File(".");
+ for (File entry : pwd.listFiles()) {
+ try {
+ if (entry.isDirectory()) {
+ FileUtils.deleteDirectory(entry);
+ } else {
+ if (!entry.delete()) {
+ System.out.println("Could not delete " + entry);
+ }
+ }
+ } catch (Exception e) {
+ System.out.println("Could not delete " + entry + ": " + e.getMessage());
+ }
+ }
+ }
+
+ @Override
+ public void postSetUp() {
+ Invoke.invokeInEveryVM("delete files", this::deleteVMFiles);
+ IgnoredException.addIgnoredException(
+ "cluster configuration service not available|ConflictingPersistentDataException");
+ }
+
+
+ void putDataSerializableAndVerify(VM putter, String regionName,
+ VM... vms) throws Exception {
+ for (int i = 0; i < 10; i++) {
+ Class aClass = Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.cache.ExpirationAttributes");
+ Constructor constructor = aClass.getConstructor(int.class);
+ Object testDataSerializable = constructor.newInstance(i);
+ int finalI = i;
+ putter.invoke(() -> put(cache, regionName, "" + finalI, testDataSerializable));
+ }
+
+ // verify present in others
+ for (VM vm : vms) {
+ vm.invoke(() -> assertEntryExists(cache, regionName));
+ }
+ }
+
+ public Properties getSystemProperties() {
+ Properties props = DistributedTestUtils.getAllDistributedSystemProperties(new Properties());
+ props.remove("disable-auto-reconnect");
+ props.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "true");
+ props.put(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
+ props.remove(DistributionConfig.LOAD_CLUSTER_CONFIG_FROM_DIR_NAME);
+ props.remove(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME);
+ props.remove(DistributionConfig.LOCK_MEMORY_NAME);
+ return props;
+ }
+
+ public Properties getSystemProperties(int[] locatorPorts) {
+ Properties props = new Properties();
+ String locatorString = getLocatorString(locatorPorts);
+ props.setProperty("locators", locatorString);
+ props.setProperty("mcast-port", "0");
+ props.put(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "true");
+ props.put(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
+ props.remove(DistributionConfig.LOAD_CLUSTER_CONFIG_FROM_DIR_NAME);
+ props.setProperty(DistributionConfig.LOG_LEVEL_NAME, DUnitLauncher.logLevel);
+ return props;
+ }
+}