You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2020/11/03 00:27:46 UTC

[geode] branch develop updated: GEODE-8672: No need in token mode if concurrencyChecksEnabled (#5691)

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new e695938  GEODE-8672: No need in token mode if concurrencyChecksEnabled (#5691)
e695938 is described below

commit e695938dff4b39f1755c707e81e1eb7e2e143fe0
Author: Eric Shu <es...@pivotal.io>
AuthorDate: Mon Nov 2 16:26:52 2020 -0800

    GEODE-8672: No need in token mode if concurrencyChecksEnabled (#5691)
    
      * The DESTROYED token is only needed to prevent concurrent destroy op
        is lost in GII. If concurrency checks are enabled, the version tag
        should be able to prevent the destroy op being lost.
---
 .../control/RebalanceOperationDistributedTest.java | 271 +++++++++++++++------
 .../apache/geode/internal/cache/LocalRegion.java   |   6 +-
 .../geode/internal/cache/LocalRegionTest.java      |  41 ++++
 3 files changed, 244 insertions(+), 74 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
index 3592003..605547b 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
@@ -39,6 +39,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -50,9 +51,14 @@ import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -70,13 +76,16 @@ import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.CacheLoader;
 import org.apache.geode.cache.CacheLoaderException;
 import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.CommitConflictException;
 import org.apache.geode.cache.DiskStoreFactory;
 import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.LoaderHelper;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.TransactionDataRebalancedException;
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
 import org.apache.geode.cache.asyncqueue.AsyncEventListener;
 import org.apache.geode.cache.control.RebalanceOperation;
@@ -98,10 +107,14 @@ import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.PRHARedundancyProvider;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.PartitionedRegionDataStore;
+import org.apache.geode.internal.cache.RegionEntry;
 import org.apache.geode.internal.cache.SignalBounceOnRequestImageMessageObserver;
+import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserverAdapter;
 import org.apache.geode.internal.cache.partitioned.BucketCountLoadProbe;
 import org.apache.geode.internal.cache.partitioned.LoadProbe;
+import org.apache.geode.internal.cache.versions.VersionStamp;
+import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.SerializableRunnable;
@@ -125,6 +138,12 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
 
   private static final long TIMEOUT_SECONDS = GeodeAwaitility.getTimeout().getSeconds();
 
+  private VM vm0 = getVM(0);
+  private VM vm1 = getVM(1);
+  private VM vm2 = getVM(2);
+  private VM vm3 = getVM(3);
+  private boolean toSetBucketNumber = false;
+
   @Rule
   public DistributedRestoreSystemProperties restoreSystemProperties =
       new DistributedRestoreSystemProperties();
@@ -144,9 +163,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testRecoverRedundancy(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-
     // Create the region in only 1 VM
     vm0.invoke(() -> createPartitionedRegion("region1", 1));
 
@@ -215,9 +231,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testEnforceIP(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-
     for (VM vm : toArray(vm0, vm1)) {
       vm.invoke(() -> {
         Properties props = new Properties();
@@ -286,10 +299,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testEnforceZone(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     vm0.invoke(() -> setRedundancyZone("A"));
     vm1.invoke(() -> setRedundancyZone("A"));
 
@@ -371,10 +380,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
 
   @Test
   public void testEnforceZoneWithMultipleRegions() {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     vm0.invoke(() -> setRedundancyZone("A"));
     vm1.invoke(() -> setRedundancyZone("A"));
 
@@ -463,10 +468,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testRecoverRedundancyBalancing(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     DistributedMember member1 = vm0.invoke(() -> {
       createPartitionedRegion("region1", 1, 200);
       return getCache().getDistributedSystem().getDistributedMember();
@@ -534,10 +535,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
 
   @Test
   public void testRecoverRedundancyBalancingIfCreateBucketFails() {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     DistributedMember member1 = vm0.invoke(() -> {
       createPartitionedRegion("region1", 1, 100);
       return getCache().getDistributedSystem().getDistributedMember();
@@ -644,10 +641,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testRecoverRedundancyColocatedRegions(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     DistributedMember member1 = vm0.invoke(() -> {
       createPartitionedRegion("region1", 1, 200);
       return getCache().getDistributedSystem().getDistributedMember();
@@ -751,10 +744,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
           () -> System.setProperty(GeodeGlossary.GEMFIRE_PREFIX + "LOG_REBALANCE", "true"));
     }
 
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     DistributedMember member1 = vm0.invoke(() -> {
       createPRRegionWithAsyncQueue(200);
       return getCache().getDistributedSystem().getDistributedMember();
@@ -857,9 +846,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
 
   @Test
   public void testCancelOperation() {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-
     // Create the region in only 1 VM
     vm0.invoke(() -> createPartitionedRegion("region1", 1));
 
@@ -935,10 +921,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
    */
   @Test
   public void testMembershipChange() {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     // Create the region in only 1 VM
     vm0.invoke(() -> createPartitionedRegion("region1", 0));
 
@@ -1018,9 +1000,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testMoveBucketsNoRedundancy(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-
     // Create the region in only 1 VM
     vm0.invoke(() -> createPartitionedRegion("region1", new NullReturningLoader()));
 
@@ -1116,9 +1095,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testFilterRegions(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-
     Set<String> included = new HashSet<>();
     included.add("region0");
     included.add("region1");
@@ -1238,10 +1214,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testMoveBucketsWithRedundancy(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     // Create the region in two VMs
     vm0.invoke(() -> createPartitionedRegion("region1", 1));
     vm1.invoke(() -> createPartitionedRegion("region1", 1));
@@ -1332,11 +1304,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
    */
   @Test
   public void testMoveBucketsOverflowToDisk() {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-    VM vm3 = getVM(3);
-
     // Create the region in two VMs
     vm0.invoke(
         () -> createPartitionedRegion("region1", createLRUEntryAttributes(1, OVERFLOW_TO_DISK)));
@@ -1501,10 +1468,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
    */
   @Test
   public void testMoveBucketsNestedPR() {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     // Create the region in two VMs
     for (VM vm : toArray(vm0, vm1)) {
       vm.invoke(() -> {
@@ -1597,10 +1560,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testMoveBucketsColocatedRegions(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-
     vm0.invoke(() -> createPartitionedRegion("region1", 1, 200));
     vm0.invoke(() -> createPartitionedRegion("region2", 200, "region1"));
     vm1.invoke(() -> createPartitionedRegion("region1", 1, 200));
@@ -1707,8 +1666,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
   @Parameters({"PUT", "INVALIDATE", "DESTROY", "CACHE_LOADER"})
   @TestCaseName("{method}(operation={0})")
   public void runTestWaitForOperation(OperationEnum operation) throws Exception {
-    VM vm1 = getVM(1);
-
     // Create a region in this VM with a cache writer
     // and cache loader
     createPartitionedRegion("region1", 1, 100, (CacheLoader<Number, String>) helper1 -> "anobject");
@@ -1780,11 +1737,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
   @TestCaseName("{method}(simulate={0}, userAccessor={1})")
   public void testRecoverRedundancyWithOfflinePersistence(boolean simulate, boolean useAccessor)
       throws Exception {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-    VM vm2 = getVM(2);
-    VM vm3 = getVM(3);
-
     int redundantCopies = 1;
 
     // Create the region in only 2 VMs
@@ -1967,9 +1919,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testMoveBucketsWithUnrecoveredValues(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-
     // Create the region in only 1 VM
     vm0.invoke(() -> createPersistentPartitionedRegion("region1", "store", getDiskDirs(),
         new NullReturningLoader<>()));
@@ -2102,9 +2051,6 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
   @Parameters({"true", "false"})
   @TestCaseName("{method}(simulate={0})")
   public void testBalanceBucketsByCount(boolean simulate) {
-    VM vm0 = getVM(0);
-    VM vm1 = getVM(1);
-
     // Cache is closed so loadProbeToRestore does not need to be restored
     vm0.invoke(
         () -> {
@@ -2356,6 +2302,9 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
     partitionAttributesFactory.setRedundantCopies(redundantCopies);
     partitionAttributesFactory.setRecoveryDelay(-1);
     partitionAttributesFactory.setStartupRecoveryDelay(-1);
+    if (toSetBucketNumber) {
+      partitionAttributesFactory.setTotalNumBuckets(totalNumberOfBuckets);
+    }
 
     RegionFactory regionFactory = getCache().createRegionFactory(PARTITION);
     regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
@@ -2834,4 +2783,180 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
       closed = true;
     }
   }
+
+  private String regionName = "region";
+  private int numOfEntry = 2500;
+  private int totalNumberOfBuckets = 31;
+  private final ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
+
+  @Test
+  public void correctVersionGeneratedForConcurrentOperationsInTxWithRebalance() throws Exception {
+    toSetBucketNumber = true;
+    vm0.invoke(() -> createPartitionedRegion(regionName, 0));
+    vm0.invoke(() -> doPut("A"));
+    vm0.invoke(this::doDestroy);
+    vm0.invoke(() -> doPut("B"));
+
+    vm1.invoke(() -> createPartitionedRegion(regionName, 0));
+    doConcurrentOpsAndRebalance("C");
+    validateVersionsInVms(vm0, vm1);
+
+    vm2.invoke(() -> createPartitionedRegion(regionName, 0));
+    doConcurrentOpsAndRebalance("D");
+    validateVersionsInVms(vm0, vm1, vm2);
+
+    vm3.invoke(() -> createPartitionedRegion(regionName, 0));
+    doConcurrentOpsAndRebalance("E");
+    validateVersionsInVms(vm0, vm1, vm2, vm3);
+  }
+
+  private void doConcurrentOpsAndRebalance(String s) throws Exception {
+    AsyncInvocation async0 = vm0.invokeAsync(this::doConcurrentDestroyInTx);
+    AsyncInvocation async1 = vm1.invokeAsync(() -> doConcurrentPutInTx(s));
+    vm0.invoke(() -> doRebalance());
+    async0.await();
+    async1.await();
+  }
+
+  private void doRebalance() throws TimeoutException, InterruptedException {
+    InternalResourceManager manager = getCache().getInternalResourceManager();
+    doRebalance(false, manager);
+  }
+
+  private void doConcurrentPutInTx(String s) throws Exception {
+    for (int i = 0; i < totalNumberOfBuckets; i++) {
+      queue.add(i);
+    }
+
+    ExecutorService pool = Executors.newCachedThreadPool();
+    Collection<Callable<Object>> tasks = new ArrayList<>();
+    Callable<Object> task = () -> {
+      doPutOpInTx(s);
+      return null;
+    };
+    for (int i = 0; i < totalNumberOfBuckets; i++) {
+      tasks.add(task);
+    }
+
+    List<Future<Object>> futures = pool.invokeAll(tasks);
+    for (Future future : futures) {
+      future.get();
+    }
+  }
+
+  private void doConcurrentDestroyInTx() throws Exception {
+    for (int i = 0; i < totalNumberOfBuckets; i++) {
+      queue.add(i);
+    }
+
+    ExecutorService pool = Executors.newCachedThreadPool();
+    Collection<Callable<Object>> tasks = new ArrayList<>();
+    Callable<Object> task = () -> {
+      doDestroyOpInTx();
+      return null;
+    };
+    for (int i = 0; i < totalNumberOfBuckets; i++) {
+      tasks.add(task);
+    }
+
+    List<Future<Object>> futures = pool.invokeAll(tasks);
+    for (Future future : futures) {
+      future.get();
+    }
+  }
+
+  private void doPutOpInTx(String s) {
+    int bucket;
+    if (!queue.isEmpty()) {
+      bucket = queue.poll();
+      Region<Number, String> region = getCache().getRegion(regionName);
+      for (int i = 0; i < numOfEntry; i++) {
+        if (i % totalNumberOfBuckets == bucket) {
+          doTXPut(region, i, s);
+        }
+      }
+    }
+  }
+
+  private void doDestroyOpInTx() {
+    int bucket;
+    if (!queue.isEmpty()) {
+      bucket = queue.poll();
+      Region<Number, String> region = getCache().getRegion(regionName);
+      for (int i = 0; i < numOfEntry; i++) {
+        if (i % totalNumberOfBuckets == bucket) {
+          doTxDestroy(region, i);
+        }
+      }
+    }
+  }
+
+  private void doPut(String s) {
+    Region<Number, String> region = getCache().getRegion(regionName);
+    for (int i = 0; i < numOfEntry; i++) {
+      region.put(i, s);
+    }
+  }
+
+  private void doTXPut(Region<Number, String> region, int i, String s) {
+    TXManagerImpl manager = getCache().getTxManager();
+    manager.begin();
+    try {
+      region.put(i, s);
+      manager.commit();
+    } catch (TransactionDataRebalancedException e) {
+      if (manager.getTransactionId() != null) {
+        manager.rollback();
+      }
+    } catch (CommitConflictException ignore) {
+    }
+  }
+
+  private void doTxDestroy(Region<Number, String> region, int i) {
+    TXManagerImpl manager = getCache().getTxManager();
+    manager.begin();
+    try {
+      region.remove(i);
+      manager.commit();
+    } catch (TransactionDataRebalancedException e) {
+      if (manager.getTransactionId() != null) {
+        manager.rollback();
+      }
+    } catch (CommitConflictException ignore) {
+    }
+  }
+
+  private void doDestroy() {
+    Region<Number, String> region = getCache().getRegion(regionName);
+    for (int i = 0; i < numOfEntry; i++) {
+      try {
+        region.destroy(i);
+      } catch (EntryNotFoundException ignore) {
+      }
+    }
+  }
+
+  private void validateVersionsInVms(VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(this::validateEntryVersions);
+    }
+  }
+
+  private void validateEntryVersions() {
+    PartitionedRegion region = (PartitionedRegion) getCache().getRegion(regionName);
+    for (int i = 0; i < numOfEntry; i++) {
+      BucketRegion bucketRegion = region.getDataStore().getLocalBucketByKey(i);
+      if (bucketRegion != null) {
+        RegionEntry entry = bucketRegion.getRegionMap().getEntry(i);
+        if (entry != null) {
+          VersionStamp stamp = entry.getVersionStamp();
+          VersionTag tag = stamp.asVersionTag();
+          if (tag.getEntryVersion() < 3) {
+            logger.info("tag for key {} is " + tag, i);
+          }
+          assertThat(tag.getEntryVersion()).isGreaterThan(2);
+        }
+      }
+    }
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 61328f3..017329c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -6836,7 +6836,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     final boolean needRIDestroyToken = inRI && riCnt > 0;
 
     try {
-      final boolean inTokenMode = needTokensForGII || needRIDestroyToken;
+      final boolean inTokenMode = isInTokenModeNeeded(needTokensForGII, needRIDestroyToken);
       entries.txApplyDestroy(key, rmtOrigin, event, inTokenMode, needRIDestroyToken, op,
           eventId, aCallbackArgument, pendingCallbacks, filterRoutingInfo, bridgeContext,
           isOriginRemote, txEntryState, versionTag, tailKey);
@@ -6847,6 +6847,10 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     }
   }
 
+  boolean isInTokenModeNeeded(boolean needTokensForGII, boolean needRIDestroyToken) {
+    return !getConcurrencyChecksEnabled() && (needTokensForGII || needRIDestroyToken);
+  }
+
   /**
    * Called by lower levels, while still holding the write sync lock, and the low level has
    * completed its part of the basic destroy
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionTest.java
index dd9cce1..60e8454 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionTest.java
@@ -277,4 +277,45 @@ public class LocalRegionTest {
 
     assertThat(region.isGenerateLocalFilterRoutingNeeded(event)).isFalse();
   }
+
+  @Test
+  public void isInTokenModeNeededReturnsFalseIfConcurrencyChecksEnabled() {
+    LocalRegion region =
+        spy(new LocalRegion("region", regionAttributes, null, cache, internalRegionArguments,
+            internalDataView, regionMapConstructor, serverRegionProxyConstructor, entryEventFactory,
+            poolFinder, regionPerfStatsFactory, disabledClock()));
+    doReturn(true).when(region).getConcurrencyChecksEnabled();
+
+    assertThat(region.isInTokenModeNeeded(true, true)).isFalse();
+  }
+
+  @Test
+  public void isInTokenModeNeededReturnsFalseIfBothNeedTokensForGIIAndNeedRIDestroyTokenAreFalse() {
+    LocalRegion region =
+        spy(new LocalRegion("region", regionAttributes, null, cache, internalRegionArguments,
+            internalDataView, regionMapConstructor, serverRegionProxyConstructor, entryEventFactory,
+            poolFinder, regionPerfStatsFactory, disabledClock()));
+
+    assertThat(region.isInTokenModeNeeded(false, false)).isFalse();
+  }
+
+  @Test
+  public void isInTokenModeNeededReturnsTrueIfConcurrencyChecksNotEnabledAndNeedTokensForGII() {
+    LocalRegion region =
+        spy(new LocalRegion("region", regionAttributes, null, cache, internalRegionArguments,
+            internalDataView, regionMapConstructor, serverRegionProxyConstructor, entryEventFactory,
+            poolFinder, regionPerfStatsFactory, disabledClock()));
+
+    assertThat(region.isInTokenModeNeeded(true, false)).isTrue();
+  }
+
+  @Test
+  public void isInTokenModeNeededReturnsTrueIfConcurrencyChecksNotEnabledAndNeedRIDestroyToken() {
+    LocalRegion region =
+        spy(new LocalRegion("region", regionAttributes, null, cache, internalRegionArguments,
+            internalDataView, regionMapConstructor, serverRegionProxyConstructor, entryEventFactory,
+            poolFinder, regionPerfStatsFactory, disabledClock()));
+
+    assertThat(region.isInTokenModeNeeded(false, true)).isTrue();
+  }
 }