You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/28 19:14:29 UTC
[geode] 02/23: GEODE-7682: add PR.clear API (#4755)
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 30eb806a6fbcfb1b8f8406c2c9c5e4b1c9cdc2e2
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Thu Mar 5 23:46:36 2020 -0800
GEODE-7682: add PR.clear API (#4755)
* GEODE-7683: introduce BR.cmnClearRegion
Co-authored-by: Xiaojian Zhou <gz...@pivotal.io>
---
.../cache/PartitionedRegionClearDUnitTest.java | 218 +++++++++++++++++++++
.../PartitionedRegionPersistentClearDUnitTest.java | 26 +++
...itionedRegionSingleNodeOperationsJUnitTest.java | 66 -------
.../codeAnalysis/sanctionedDataSerializables.txt | 4 +-
.../org/apache/geode/internal/DSFIDFactory.java | 3 +
.../geode/internal/cache/DistributedRegion.java | 9 -
.../apache/geode/internal/cache/LocalRegion.java | 10 +
.../geode/internal/cache/PartitionedRegion.java | 214 ++++++++++++++++++--
.../geode/internal/cache/RegionEventImpl.java | 5 +
.../internal/cache/partitioned/ClearPRMessage.java | 166 +++++-----------
.../cache/partitioned/ClearPRMessageTest.java | 50 ++---
11 files changed, 522 insertions(+), 249 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
new file mode 100644
index 0000000..fb2a81b
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.internal.Assert.fail;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getCache;
+import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCache;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.test.dunit.SerializableCallableIF;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+public class PartitionedRegionClearDUnitTest implements Serializable {
+ protected static final String REGION_NAME = "testPR";
+ protected static final int NUM_ENTRIES = 1000;
+
+ protected int locatorPort;
+ protected MemberVM locator;
+ protected MemberVM dataStore1, dataStore2, dataStore3, accessor;
+ protected ClientVM client1, client2;
+
+ private static final Logger logger = LogManager.getLogger();
+
+ @Rule
+ public ClusterStartupRule cluster = new ClusterStartupRule(7);
+
+ @Before
+ public void setUp() throws Exception {
+ locator = cluster.startLocatorVM(0);
+ locatorPort = locator.getPort();
+ dataStore1 = cluster.startServerVM(1, getProperties(), locatorPort);
+ dataStore2 = cluster.startServerVM(2, getProperties(), locatorPort);
+ dataStore3 = cluster.startServerVM(3, getProperties(), locatorPort);
+ accessor = cluster.startServerVM(4, getProperties(), locatorPort);
+ client1 = cluster.startClientVM(5,
+ c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+ client2 = cluster.startClientVM(6,
+ c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
+ dataStore1.invoke(this::initDataStore);
+ dataStore2.invoke(this::initDataStore);
+ dataStore3.invoke(this::initDataStore);
+ accessor.invoke(this::initAccessor);
+ client1.invoke(this::initClientCache);
+ client2.invoke(this::initClientCache);
+ }
+
+ protected RegionShortcut getRegionShortCut() {
+ return RegionShortcut.PARTITION_REDUNDANT;
+ }
+
+ protected Properties getProperties() {
+ Properties properties = new Properties();
+ properties.setProperty("log-level", "info");
+ return properties;
+ }
+
+ private Region getRegion(boolean isClient) {
+ if (isClient) {
+ return getClientCache().getRegion(REGION_NAME);
+ } else {
+ return getCache().getRegion(REGION_NAME);
+ }
+ }
+
+ private void verifyRegionSize(boolean isClient, int expectedNum) {
+ assertThat(getRegion(isClient).size()).isEqualTo(expectedNum);
+ }
+
+ private void initClientCache() {
+ Region region = getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+ .create(REGION_NAME);
+ region.registerInterestForAllKeys(InterestResultPolicy.KEYS);
+ }
+
+ private void initDataStore() {
+ getCache().createRegionFactory(getRegionShortCut())
+ .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create())
+ .addCacheListener(new CountingCacheListener())
+ .create(REGION_NAME);
+ }
+
+ private void initAccessor() {
+ RegionShortcut shortcut = getRegionShortCut();
+ if (shortcut.isPersistent()) {
+ if (shortcut == RegionShortcut.PARTITION_PERSISTENT) {
+ shortcut = RegionShortcut.PARTITION;
+ } else if (shortcut == RegionShortcut.PARTITION_PERSISTENT_OVERFLOW) {
+ shortcut = RegionShortcut.PARTITION_OVERFLOW;
+ } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT) {
+ shortcut = RegionShortcut.PARTITION_REDUNDANT;
+ } else if (shortcut == RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW) {
+ shortcut = RegionShortcut.PARTITION_REDUNDANT_OVERFLOW;
+ } else {
+ fail("Wrong region type:" + shortcut);
+ }
+ }
+ getCache().createRegionFactory(shortcut)
+ .setPartitionAttributes(
+ new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create())
+ .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create())
+ .addCacheListener(new CountingCacheListener())
+ .create(REGION_NAME);
+ }
+
+ private void feed(boolean isClient) {
+ Region region = getRegion(isClient);
+ IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i));
+ }
+
+ private void verifyServerRegionSize(int expectedNum) {
+ accessor.invoke(() -> verifyRegionSize(false, expectedNum));
+ dataStore1.invoke(() -> verifyRegionSize(false, expectedNum));
+ dataStore2.invoke(() -> verifyRegionSize(false, expectedNum));
+ dataStore3.invoke(() -> verifyRegionSize(false, expectedNum));
+ }
+
+ private void verifyClientRegionSize(int expectedNum) {
+ client1.invoke(() -> verifyRegionSize(true, expectedNum));
+ // TODO: notify register clients
+ // client2.invoke(()->verifyRegionSize(true, expectedNum));
+ }
+
+ private void verifyCacheListenerTriggerCount(MemberVM serverVM) {
+ SerializableCallableIF<Integer> getListenerTriggerCount = () -> {
+ CountingCacheListener countingCacheListener =
+ (CountingCacheListener) getRegion(false).getAttributes()
+ .getCacheListeners()[0];
+ return countingCacheListener.getClears();
+ };
+
+ int count = accessor.invoke(getListenerTriggerCount)
+ + dataStore1.invoke(getListenerTriggerCount)
+ + dataStore2.invoke(getListenerTriggerCount)
+ + dataStore3.invoke(getListenerTriggerCount);
+ assertThat(count).isEqualTo(1);
+
+ if (serverVM != null) {
+ assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1);
+ }
+ }
+
+ @Test
+ public void normalClearFromDataStore() {
+ accessor.invoke(() -> feed(false));
+ verifyServerRegionSize(NUM_ENTRIES);
+ dataStore1.invoke(() -> getRegion(false).clear());
+ verifyServerRegionSize(0);
+ verifyCacheListenerTriggerCount(dataStore1);
+ }
+
+ @Test
+ public void normalClearFromAccessor() {
+ accessor.invoke(() -> feed(false));
+ verifyServerRegionSize(NUM_ENTRIES);
+ accessor.invoke(() -> getRegion(false).clear());
+ verifyServerRegionSize(0);
+ verifyCacheListenerTriggerCount(accessor);
+ }
+
+ @Test
+ public void normalClearFromClient() {
+ client1.invoke(() -> feed(true));
+ verifyClientRegionSize(NUM_ENTRIES);
+ verifyServerRegionSize(NUM_ENTRIES);
+
+ client1.invoke(() -> getRegion(true).clear());
+ verifyServerRegionSize(0);
+ verifyClientRegionSize(0);
+ verifyCacheListenerTriggerCount(null);
+ }
+
+ private static class CountingCacheListener extends CacheListenerAdapter {
+ private final AtomicInteger clears = new AtomicInteger();
+
+ @Override
+ public void afterRegionClear(RegionEvent event) {
+ Region region = event.getRegion();
+ logger.info("Region " + region.getFullPath() + " is cleared.");
+ clears.incrementAndGet();
+ }
+
+ int getClears() {
+ return clears.get();
+ }
+ }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java
new file mode 100644
index 0000000..847699b
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionPersistentClearDUnitTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+
+
+import org.apache.geode.cache.RegionShortcut;
+
+public class PartitionedRegionPersistentClearDUnitTest extends PartitionedRegionClearDUnitTest {
+
+ protected RegionShortcut getRegionShortCut() {
+ return RegionShortcut.PARTITION_REDUNDANT_PERSISTENT_OVERFLOW;
+ }
+}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
index b37945b..4f36060 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.fail;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Set;
@@ -1298,71 +1297,6 @@ public class PartitionedRegionSingleNodeOperationsJUnitTest {
}
}
- @Test
- public void test023UnsupportedOps() throws Exception {
- Region pr = null;
- try {
- pr = PartitionedRegionTestHelper.createPartitionedRegion("testUnsupportedOps",
- String.valueOf(200), 0);
-
- pr.put(new Integer(1), "one");
- pr.put(new Integer(2), "two");
- pr.put(new Integer(3), "three");
- pr.getEntry("key");
-
- try {
- pr.clear();
- fail(
- "PartitionedRegionSingleNodeOperationTest:testUnSupportedOps() operation failed on a blank PartitionedRegion");
- } catch (UnsupportedOperationException expected) {
- }
-
- // try {
- // pr.entries(true);
- // fail();
- // }
- // catch (UnsupportedOperationException expected) {
- // }
-
- // try {
- // pr.entrySet(true);
- // fail();
- // }
- // catch (UnsupportedOperationException expected) {
- // }
-
- try {
- HashMap data = new HashMap();
- data.put("foo", "bar");
- data.put("bing", "bam");
- data.put("supper", "hero");
- pr.putAll(data);
- // fail("testPutAll() does NOT throw UnsupportedOperationException");
- } catch (UnsupportedOperationException onse) {
- }
-
-
- // try {
- // pr.values();
- // fail("testValues() does NOT throw UnsupportedOperationException");
- // }
- // catch (UnsupportedOperationException expected) {
- // }
-
-
- try {
- pr.containsValue("foo");
- } catch (UnsupportedOperationException ex) {
- fail("PartitionedRegionSingleNodeOperationTest:testContainsValue() operation failed");
- }
-
- } finally {
- if (pr != null) {
- pr.destroyRegion();
- }
- }
- }
-
/**
* This method validates size operations. It verifies that it returns correct size of the
* PartitionedRegion.
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 8e522a2..fb83c84 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1377,8 +1377,8 @@ fromData,27
toData,27
org/apache/geode/internal/cache/partitioned/ClearPRMessage,2
-fromData,30
-toData,44
+fromData,19
+toData,36
org/apache/geode/internal/cache/partitioned/ClearPRMessage$ClearReplyMessage,2
fromData,17
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index 504e7d1..26d92c9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -289,6 +289,7 @@ import org.apache.geode.internal.cache.partitioned.BucketCountLoadProbe;
import org.apache.geode.internal.cache.partitioned.BucketProfileUpdateMessage;
import org.apache.geode.internal.cache.partitioned.BucketSizeMessage;
import org.apache.geode.internal.cache.partitioned.BucketSizeMessage.BucketSizeReplyMessage;
+import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage;
import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueReplyMessage;
import org.apache.geode.internal.cache.partitioned.CreateBucketMessage;
@@ -985,6 +986,8 @@ public class DSFIDFactory implements DataSerializableFixedID {
serializer.registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY,
GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class);
serializer.registerDSFID(ABORT_BACKUP_REQUEST, AbortBackupRequest.class);
+ serializer.registerDSFID(PR_CLEAR_MESSAGE, ClearPRMessage.class);
+ serializer.registerDSFID(PR_CLEAR_REPLY_MESSAGE, ClearPRMessage.ClearReplyMessage.class);
serializer.registerDSFID(HOST_AND_PORT, HostAndPort.class);
serializer.registerDSFID(DISTRIBUTED_PING_MESSAGE, DistributedPingMessage.class);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 489d85a..84b5a3b 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -192,10 +192,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
@MutableForTesting
public static boolean ignoreReconnect = false;
- /**
- * Lock to prevent multiple threads on this member from performing a clear at the same time.
- */
- private final Object clearLock = new Object();
private final ReentrantReadWriteLock failedInitialImageLock = new ReentrantReadWriteLock(true);
@MakeNotStatic
@@ -933,11 +929,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
}
}
- private void lockCheckReadiness() {
- cache.getCancelCriterion().checkCancelInProgress(null);
- checkReadiness();
- }
-
@Override
Object validatedDestroy(Object key, EntryEventImpl event)
throws TimeoutException, EntryNotFoundException, CacheWriterException {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 61328f3..5865f60 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -471,6 +471,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
private final Lock clientMetaDataLock = new ReentrantLock();
/**
+ * Lock to prevent multiple threads on this member from performing a clear at the same time.
+ */
+ protected final Object clearLock = new Object();
+
+ /**
* Lock for updating the cache service profile for the region.
*/
private final Lock cacheServiceProfileUpdateLock = new ReentrantLock();
@@ -2749,6 +2754,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
checkRegionDestroyed(true);
}
+ protected void lockCheckReadiness() {
+ cache.getCancelCriterion().checkCancelInProgress(null);
+ checkReadiness();
+ }
+
/**
* This method should be called when the caller cannot locate an entry and that condition is
* unexpected. This will first double check the cache and region state before throwing an
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 8411a13..1aa427a 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -181,6 +181,7 @@ import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultWa
import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
+import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage;
import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse;
import org.apache.geode.internal.cache.partitioned.DestroyMessage;
@@ -2173,18 +2174,202 @@ public class PartitionedRegion extends LocalRegion
throw new UnsupportedOperationException();
}
- /**
- * @since GemFire 5.0
- * @throws UnsupportedOperationException OVERRIDES
- */
@Override
- public void clear() {
- throw new UnsupportedOperationException();
+ void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ synchronized (clearLock) {
+ final DistributedLockService lockService = getPartitionedRegionLockService();
+ try {
+ lockService.lock("_clearOperation" + this.getFullPath().replace('/', '_'), -1, -1);
+ } catch (IllegalStateException e) {
+ lockCheckReadiness();
+ throw e;
+ }
+ try {
+ if (cache.isCacheAtShutdownAll()) {
+ throw cache.getCacheClosedException("Cache is shutting down");
+ }
+
+ // create ClearPRMessage per bucket
+ List<ClearPRMessage> clearMsgList = createClearPRMessages();
+ for (ClearPRMessage clearPRMessage : clearMsgList) {
+ int bucketId = clearPRMessage.getBucketId();
+ checkReadiness();
+ long sendMessagesStartTime = 0;
+ if (isDebugEnabled) {
+ sendMessagesStartTime = System.currentTimeMillis();
+ }
+ try {
+ sendClearMsgByBucket(bucketId, clearPRMessage);
+ } catch (PartitionOfflineException poe) {
+ // TODO add a PartialResultException
+ logger.info("PR.sendClearMsgByBucket encountered PartitionOfflineException at bucket "
+ + bucketId, poe);
+ } catch (Exception e) {
+ logger.info("PR.sendClearMsgByBucket encountered exception at bucket " + bucketId, e);
+ }
+
+ if (isDebugEnabled) {
+ long now = System.currentTimeMillis();
+ logger.debug("PR.sendClearMsgByBucket for bucket {} took {} ms", bucketId,
+ (now - sendMessagesStartTime));
+ }
+ // TODO add psStats
+ }
+ } finally {
+ try {
+ lockService.unlock("_clearOperation" + this.getFullPath().replace('/', '_'));
+ } catch (IllegalStateException e) {
+ lockCheckReadiness();
+ }
+ }
+
+ // notify bridge clients at PR level
+ regionEvent.setEventType(EnumListenerEvent.AFTER_REGION_CLEAR);
+ boolean hasListener = hasListener();
+ if (hasListener) {
+ dispatchListenerEvent(EnumListenerEvent.AFTER_REGION_CLEAR, regionEvent);
+ }
+ notifyBridgeClients(regionEvent);
+ logger.info("Partitioned region {} finsihed clear operation.", this.getFullPath());
+ }
}
- @Override
- void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
- throw new UnsupportedOperationException();
+ void sendClearMsgByBucket(final Integer bucketId, ClearPRMessage clearPRMessage) {
+ RetryTimeKeeper retryTime = null;
+ InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId, null);
+ if (logger.isDebugEnabled()) {
+ logger.debug("PR.sendClearMsgByBucket:bucket {}'s currentTarget is {}", bucketId,
+ currentTarget);
+ }
+
+ long timeOut = 0;
+ int count = 0;
+ while (true) {
+ switch (count) {
+ case 0:
+ // Note we don't check for DM cancellation in common case.
+ // First time. Assume success, keep going.
+ break;
+ case 1:
+ this.cache.getCancelCriterion().checkCancelInProgress(null);
+ // Second time (first failure). Calculate timeout and keep going.
+ timeOut = System.currentTimeMillis() + this.retryTimeout;
+ break;
+ default:
+ this.cache.getCancelCriterion().checkCancelInProgress(null);
+ // test for timeout
+ long timeLeft = timeOut - System.currentTimeMillis();
+ if (timeLeft < 0) {
+ PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket" + bucketId,
+ this.retryTimeout);
+ // NOTREACHED
+ }
+
+ // Didn't time out. Sleep a bit and then continue
+ boolean interrupted = Thread.interrupted();
+ try {
+ Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION);
+ } catch (InterruptedException ignore) {
+ interrupted = true;
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ break;
+ } // switch
+ count++;
+
+ if (currentTarget == null) { // pick target
+ checkReadiness();
+ if (retryTime == null) {
+ retryTime = new RetryTimeKeeper(this.retryTimeout);
+ }
+
+ currentTarget = waitForNodeOrCreateBucket(retryTime, null, bucketId, false);
+ if (currentTarget == null) {
+ // the bucket does not exist, no need to clear
+ logger.info("Bucket " + bucketId + " does not contain data, no need to clear");
+ return;
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("PR.sendClearMsgByBucket: new currentTarget is {}", currentTarget);
+ }
+ }
+
+ // It's possible this is a GemFire thread e.g. ServerConnection
+ // which got to this point because of a distributed system shutdown or
+ // region closure which uses interrupt to break any sleep() or wait() calls
+ // e.g. waitForPrimary or waitForBucketRecovery in which case throw exception
+ checkShutdown();
+ continue;
+ } // pick target
+
+ boolean result = false;
+ try {
+ final boolean isLocal = (this.localMaxMemory > 0) && currentTarget.equals(getMyId());
+ if (isLocal) {
+ result = clearPRMessage.doLocalClear(this);
+ } else {
+ ClearPRMessage.ClearResponse response = clearPRMessage.send(currentTarget, this);
+ if (response != null) {
+ this.prStats.incPartitionMessagesSent();
+ result = response.waitForResult();
+ }
+ }
+ if (result) {
+ return;
+ }
+ } catch (ForceReattemptException fre) {
+ checkReadiness();
+ InternalDistributedMember lastTarget = currentTarget;
+ if (retryTime == null) {
+ retryTime = new RetryTimeKeeper(this.retryTimeout);
+ }
+ currentTarget = getNodeForBucketWrite(bucketId, retryTime);
+ if (lastTarget.equals(currentTarget)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("PR.sendClearMsgByBucket: Retrying at the same node:{} due to {}",
+ currentTarget, fre.getMessage());
+ }
+ if (retryTime.overMaximum()) {
+ PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket",
+ this.retryTimeout);
+ // NOTREACHED
+ }
+ retryTime.waitToRetryNode();
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("PR.sendClearMsgByBucket: Old target was {}, Retrying {}", lastTarget,
+ currentTarget);
+ }
+ }
+ }
+
+ // It's possible this is a GemFire thread e.g. ServerConnection
+ // which got to this point because of a distributed system shutdown or
+ // region closure which uses interrupt to break any sleep() or wait()
+ // calls
+ // e.g. waitForPrimary or waitForBucketRecovery in which case throw
+ // exception
+ checkShutdown();
+
+ // If we get here, the attempt failed...
+ if (count == 1) {
+ // TODO prStats add ClearPRMsg retried
+ this.prStats.incPutAllMsgsRetried();
+ }
+ }
+ }
+
+ List<ClearPRMessage> createClearPRMessages() {
+ ArrayList<ClearPRMessage> clearMsgList = new ArrayList<>();
+ for (int bucketId = 0; bucketId < this.totalNumberOfBuckets; bucketId++) {
+ ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId);
+ clearMsgList.add(clearPRMessage);
+ }
+ return clearMsgList;
}
@Override
@@ -2603,7 +2788,7 @@ public class PartitionedRegion extends LocalRegion
retryTime = new RetryTimeKeeper(this.retryTimeout);
}
- currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId);
+ currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true);
if (isDebugEnabled) {
logger.debug("PR.sendMsgByBucket: event size is {}, new currentTarget is {}",
getEntrySize(event), currentTarget);
@@ -2742,7 +2927,7 @@ public class PartitionedRegion extends LocalRegion
retryTime = new RetryTimeKeeper(this.retryTimeout);
}
- currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId);
+ currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true);
if (logger.isDebugEnabled()) {
logger.debug("PR.sendMsgByBucket: event size is {}, new currentTarget is {}",
getEntrySize(event), currentTarget);
@@ -2987,7 +3172,7 @@ public class PartitionedRegion extends LocalRegion
if (retryTime == null) {
retryTime = new RetryTimeKeeper(this.retryTimeout);
}
- currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId);
+ currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true);
// It's possible this is a GemFire thread e.g. ServerConnection
// which got to this point because of a distributed system shutdown or
@@ -3146,10 +3331,11 @@ public class PartitionedRegion extends LocalRegion
* @param retryTime the RetryTimeKeeper to track retry times
* @param event the event used to get the entry size in the event a new bucket should be created
* @param bucketId the identity of the bucket should it be created
+ * @param createIfNotExist boolean to indicate if to create a bucket if found not exist
* @return a Node which contains the bucket, potentially null
*/
private InternalDistributedMember waitForNodeOrCreateBucket(RetryTimeKeeper retryTime,
- EntryEventImpl event, Integer bucketId) {
+ EntryEventImpl event, Integer bucketId, boolean createIfNotExist) {
InternalDistributedMember newNode;
if (retryTime.overMaximum()) {
PRHARedundancyProvider.timedOut(this, null, null, "allocate a bucket",
@@ -3159,7 +3345,7 @@ public class PartitionedRegion extends LocalRegion
retryTime.waitForBucketsRecovery();
newNode = getNodeForBucketWrite(bucketId, retryTime);
- if (newNode == null) {
+ if (newNode == null && createIfNotExist) {
newNode = createBucket(bucketId, getEntrySize(event), retryTime);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
index fba513d..49dc932 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
@@ -119,6 +119,11 @@ public class RegionEventImpl
return region;
}
+ public void setRegion(LocalRegion region) {
+ this.region = region;
+ this.distributedMember = region.getMyId();
+ }
+
@Override
public Operation getOperation() {
return this.op;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
index 1a8aba1..9fa8057 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
@@ -26,7 +26,8 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.DataSerializer;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.CacheException;
-import org.apache.geode.distributed.DistributedLockService;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.persistence.PartitionOfflineException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DirectReplyProcessor;
@@ -44,7 +45,6 @@ import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.cache.RegionEventImpl;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.serialization.DeserializationContext;
@@ -54,16 +54,10 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
public class ClearPRMessage extends PartitionMessageWithDirectReply {
private static final Logger logger = LogService.getLogger();
- private RegionEventImpl regionEvent;
-
private Integer bucketId;
- /** The time in ms to wait for a lock to be obtained during doLocalClear() */
- public static final int LOCK_WAIT_TIMEOUT_MS = 1000;
public static final String BUCKET_NON_PRIMARY_MESSAGE =
"The bucket region on target member is no longer primary";
- public static final String BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE =
- "A lock for the bucket region could not be obtained.";
public static final String EXCEPTION_THROWN_DURING_CLEAR_OPERATION =
"An exception was thrown during the local clear operation: ";
@@ -79,14 +73,6 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
public ClearPRMessage(int bucketId) {
this.bucketId = bucketId;
-
- // These are both used by the parent class, but don't apply to this message type
- this.notificationOnly = false;
- this.posDup = false;
- }
-
- public void setRegionEvent(RegionEventImpl event) {
- regionEvent = event;
}
public void initMessage(PartitionedRegion region, Set<InternalDistributedMember> recipients,
@@ -103,16 +89,6 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
}
}
- @Override
- public boolean isSevereAlertCompatible() {
- // allow forced-disconnect processing for all cache op messages
- return true;
- }
-
- public RegionEventImpl getRegionEvent() {
- return regionEvent;
- }
-
public ClearResponse send(DistributedMember recipient, PartitionedRegion region)
throws ForceReattemptException {
Set<InternalDistributedMember> recipients =
@@ -125,7 +101,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
Set<InternalDistributedMember> failures = region.getDistributionManager().putOutgoing(this);
if (failures != null && failures.size() > 0) {
- throw new ForceReattemptException("Failed sending <" + this + ">");
+ throw new ForceReattemptException("Failed sending <" + this + "> due to " + failures);
}
return clearResponse;
}
@@ -143,7 +119,6 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
} else {
InternalDataSerializer.writeSignedVL(bucketId, out);
}
- DataSerializer.writeObject(regionEvent, out);
}
@Override
@@ -151,12 +126,11 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.bucketId = (int) InternalDataSerializer.readSignedVL(in);
- this.regionEvent = DataSerializer.readObject(in);
}
@Override
public EventID getEventID() {
- return regionEvent.getEventId();
+ return null;
}
/**
@@ -169,60 +143,51 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager,
PartitionedRegion region, long startTime) {
try {
- result = doLocalClear(region);
+ this.result = doLocalClear(region);
} catch (ForceReattemptException ex) {
sendReply(getSender(), getProcessorId(), distributionManager, new ReplyException(ex), region,
startTime);
return false;
}
- sendReply(getSender(), getProcessorId(), distributionManager, null, region, startTime);
- return false;
+ return this.result;
}
- public boolean doLocalClear(PartitionedRegion region) throws ForceReattemptException {
+ public Integer getBucketId() {
+ return this.bucketId;
+ }
+
+ public boolean doLocalClear(PartitionedRegion region)
+ throws ForceReattemptException {
// Retrieve local bucket region which matches target bucketId
- BucketRegion bucketRegion = region.getDataStore().getInitializedBucketForId(null, bucketId);
+ BucketRegion bucketRegion =
+ region.getDataStore().getInitializedBucketForId(null, this.bucketId);
- // Check if we are primary, throw exception if not
- if (!bucketRegion.isPrimary()) {
+ boolean lockedForPrimary = bucketRegion.doLockForPrimary(false);
+ // Check if we obtained primary lock, throw exception if not
+ if (!lockedForPrimary) {
throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
}
-
- DistributedLockService lockService = getPartitionRegionLockService();
- String lockName = bucketRegion.getFullPath();
try {
- boolean locked = lockService.lock(lockName, LOCK_WAIT_TIMEOUT_MS, -1);
-
- if (!locked) {
- throw new ForceReattemptException(BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE);
- }
-
- // Double check if we are still primary, as this could have changed between our first check
- // and obtaining the lock
- if (!bucketRegion.isPrimary()) {
- throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
- }
-
- try {
- bucketRegion.cmnClearRegion(regionEvent, true, true);
- } catch (Exception ex) {
- throw new ForceReattemptException(
- EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex);
- }
-
+ RegionEventImpl regionEvent = new RegionEventImpl();
+ regionEvent.setOperation(Operation.REGION_CLEAR);
+ regionEvent.setRegion(bucketRegion);
+ bucketRegion.cmnClearRegion(regionEvent, true, true);
+ } catch (PartitionOfflineException poe) {
+ logger.info(
+ "All members holding data for bucket {} are offline, no more retries will be attempted",
+ this.bucketId,
+ poe);
+ throw poe;
+ } catch (Exception ex) {
+ throw new ForceReattemptException(
+ EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex);
} finally {
- lockService.unlock(lockName);
+ bucketRegion.doUnlockForPrimary();
}
return true;
}
- // Extracted for testing
- protected DistributedLockService getPartitionRegionLockService() {
- return DistributedLockService
- .getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
- }
-
@Override
public boolean canStartRemoteTransaction() {
return false;
@@ -247,39 +212,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
buff.append("; bucketId=").append(this.bucketId);
}
- @Override
- public String toString() {
- StringBuilder buff = new StringBuilder();
- String className = getClass().getName();
- buff.append(className.substring(className.indexOf(PN_TOKEN) + PN_TOKEN.length())); // partition.<foo>
- buff.append("(prid="); // make sure this is the first one
- buff.append(this.regionId);
-
- // Append name, if we have it
- String name = null;
- try {
- PartitionedRegion region = PartitionedRegion.getPRFromId(this.regionId);
- if (region != null) {
- name = region.getFullPath();
- }
- } catch (Exception ignore) {
- /* ignored */
- }
- if (name != null) {
- buff.append(" (name = \"").append(name).append("\")");
- }
-
- appendFields(buff);
- buff.append(" ,distTx=");
- buff.append(this.isTransactionDistributed);
- buff.append(")");
- return buff.toString();
- }
-
public static class ClearReplyMessage extends ReplyMessage {
- /** Result of the Clear operation */
- boolean result;
-
@Override
public boolean getInlineProcess() {
return true;
@@ -293,16 +226,21 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
private ClearReplyMessage(int processorId, boolean result, ReplyException ex) {
super();
- this.result = result;
setProcessorId(processorId);
- setException(ex);
+ if (ex != null) {
+ setException(ex);
+ } else {
+ setReturnValue(result);
+ }
}
- /** Send an ack */
+ /**
+ * Send an ack
+ */
public static void send(InternalDistributedMember recipient, int processorId,
ReplySender replySender,
boolean result, ReplyException ex) {
- Assert.assertTrue(recipient != null, "ClearReplyMessage NULL reply message");
+ Assert.assertNotNull(recipient, "ClearReplyMessage recipient was NULL.");
ClearReplyMessage message = new ClearReplyMessage(processorId, result, ex);
message.setRecipient(recipient);
replySender.putOutgoing(message);
@@ -340,23 +278,11 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
}
@Override
- public void fromData(DataInput in,
- DeserializationContext context) throws IOException, ClassNotFoundException {
- super.fromData(in, context);
- this.result = in.readBoolean();
- }
-
- @Override
- public void toData(DataOutput out,
- SerializationContext context) throws IOException {
- super.toData(out, context);
- out.writeBoolean(this.result);
- }
-
- @Override
public String toString() {
- return "ClearReplyMessage " + "processorid=" + this.processorId + " returning " + this.result
- + " exception=" + getException();
+ StringBuilder stringBuilder = new StringBuilder(super.toString());
+ stringBuilder.append(" returnValue=");
+ stringBuilder.append(getReturnValue());
+ return stringBuilder.toString();
}
}
@@ -372,7 +298,9 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply {
}
public void setResponse(ClearReplyMessage response) {
- this.returnValue = response.result;
+ if (response.getException() == null) {
+ this.returnValue = (boolean) response.getReturnValue();
+ }
}
/**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
index 2cf5231..acdd4fc 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
@@ -20,7 +20,6 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doNothing;
@@ -38,7 +37,6 @@ import java.util.Set;
import org.junit.Before;
import org.junit.Test;
-import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionManager;
@@ -50,6 +48,7 @@ import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionDataStore;
import org.apache.geode.internal.cache.PartitionedRegionStats;
+import org.apache.geode.internal.cache.RegionEventImpl;
public class ClearPRMessageTest {
@@ -61,11 +60,14 @@ public class ClearPRMessageTest {
@Before
public void setup() throws ForceReattemptException {
message = spy(new ClearPRMessage());
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
region = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
dataStore = mock(PartitionedRegionDataStore.class);
when(region.getDataStore()).thenReturn(dataStore);
+ when(region.getFullPath()).thenReturn("/test");
bucketRegion = mock(BucketRegion.class);
when(dataStore.getInitializedBucketForId(any(), any())).thenReturn(bucketRegion);
+ RegionEventImpl bucketRegionEventImpl = mock(RegionEventImpl.class);
}
@Test
@@ -79,44 +81,19 @@ public class ClearPRMessageTest {
@Test
public void doLocalClearThrowsExceptionWhenLockCannotBeObtained() {
- DistributedLockService mockLockService = mock(DistributedLockService.class);
- doReturn(mockLockService).when(message).getPartitionRegionLockService();
-
- when(mockLockService.lock(anyString(), anyLong(), anyLong())).thenReturn(false);
- when(bucketRegion.isPrimary()).thenReturn(true);
-
- assertThatThrownBy(() -> message.doLocalClear(region))
- .isInstanceOf(ForceReattemptException.class)
- .hasMessageContaining(ClearPRMessage.BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE);
- }
-
- @Test
- public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAfterObtainingLock() {
- DistributedLockService mockLockService = mock(DistributedLockService.class);
- doReturn(mockLockService).when(message).getPartitionRegionLockService();
-
- // Be primary on the first check, then be not primary on the second check
- when(bucketRegion.isPrimary()).thenReturn(true).thenReturn(false);
- when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+ when(bucketRegion.doLockForPrimary(false)).thenReturn(false);
assertThatThrownBy(() -> message.doLocalClear(region))
.isInstanceOf(ForceReattemptException.class)
.hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
- // Confirm that we actually obtained and released the lock
- verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong());
- verify(mockLockService, times(1)).unlock(any());
}
@Test
public void doLocalClearThrowsForceReattemptExceptionWhenAnExceptionIsThrownDuringClearOperation() {
- DistributedLockService mockLockService = mock(DistributedLockService.class);
- doReturn(mockLockService).when(message).getPartitionRegionLockService();
NullPointerException exception = new NullPointerException("Error encountered");
doThrow(exception).when(bucketRegion).cmnClearRegion(any(), anyBoolean(), anyBoolean());
- // Be primary on the first check, then be not primary on the second check
- when(bucketRegion.isPrimary()).thenReturn(true);
- when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+ when(bucketRegion.doLockForPrimary(false)).thenReturn(true);
assertThatThrownBy(() -> message.doLocalClear(region))
.isInstanceOf(ForceReattemptException.class)
@@ -129,21 +106,13 @@ public class ClearPRMessageTest {
@Test
public void doLocalClearInvokesCmnClearRegionWhenBucketIsPrimaryAndLockIsObtained()
throws ForceReattemptException {
- DistributedLockService mockLockService = mock(DistributedLockService.class);
- doReturn(mockLockService).when(message).getPartitionRegionLockService();
-
// Be primary on the first check, then be not primary on the second check
- when(bucketRegion.isPrimary()).thenReturn(true);
- when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+ when(bucketRegion.doLockForPrimary(false)).thenReturn(true);
assertThat(message.doLocalClear(region)).isTrue();
// Confirm that cmnClearRegion was called
verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean());
-
- // Confirm that we actually obtained and released the lock
- verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong());
- verify(mockLockService, times(1)).unlock(any());
}
@Test
@@ -197,6 +166,7 @@ public class ClearPRMessageTest {
int processorId = 1000;
int startTime = 0;
+ doReturn(0).when(message).getBucketId();
doReturn(true).when(message).doLocalClear(region);
doReturn(sender).when(message).getSender();
doReturn(processorId).when(message).getProcessorId();
@@ -206,8 +176,9 @@ public class ClearPRMessageTest {
doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong());
message.operateOnPartitionedRegion(distributionManager, region, startTime);
+ assertThat(message.result).isTrue();
- verify(message, times(1)).sendReply(sender, processorId, distributionManager, null, region,
+ verify(message, times(0)).sendReply(sender, processorId, distributionManager, null, region,
startTime);
}
@@ -222,6 +193,7 @@ public class ClearPRMessageTest {
ForceReattemptException exception =
new ForceReattemptException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
+ doReturn(0).when(message).getBucketId();
doThrow(exception).when(message).doLocalClear(region);
doReturn(sender).when(message).getSender();
doReturn(processorId).when(message).getProcessorId();