You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mh...@apache.org on 2021/08/24 16:59:01 UTC

[geode] 01/02: first cut.

This is an automated email from the ASF dual-hosted git repository.

mhanson pushed a commit to branch rebalance_bug
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 120b6620d0cb1caaf9204be279d88fdc95a92a29
Author: Mark Hanson <ha...@vmware.com>
AuthorDate: Tue Aug 24 01:24:07 2021 -0700

    first cut.
---
 .../control/RebalanceOperationDistributedTest.java | 113 +++++++++++++++++++++
 .../internal/cache/PartitionedRegionDataStore.java |  33 ++++++
 .../control/PartitionRebalanceDetailsImpl.java     |  21 ++++
 .../partitioned/PartitionedRegionRebalanceOp.java  |   4 +
 4 files changed, 171 insertions(+)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
index 94bd61c..3a1bb5e 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
@@ -54,6 +54,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
 
 import junitparams.JUnitParamsRunner;
 import junitparams.Parameters;
@@ -76,6 +77,7 @@ import org.apache.geode.cache.LoaderHelper;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
 import org.apache.geode.cache.asyncqueue.AsyncEventListener;
 import org.apache.geode.cache.control.RebalanceOperation;
@@ -108,6 +110,7 @@ import org.apache.geode.test.dunit.SerializableRunnableIF;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.dunit.cache.CacheTestCase;
+import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
 import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
 import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 import org.apache.geode.util.internal.GeodeGlossary;
@@ -131,6 +134,9 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
   @Rule
   public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
 
+  @Rule
+  public DistributedDiskDirRule diskDirRule = new DistributedDiskDirRule();
+
   @After
   public void tearDown() {
     invokeInEveryVM(() -> {
@@ -368,6 +374,89 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
     }
   }
 
+  /**
+   * Test that we correctly use the redundancy-zone property to determine where to place redundant
+   * copies of a buckets.
+   */
+  @Test
+  public void testEnforceZoneWithSixServersAndTwoZones() throws InterruptedException {
+    VM vm0 = getVM(0);
+    VM vm1 = getVM(1);
+    VM vm2 = getVM(2);
+    VM vm3 = getVM(3);
+    VM vm4 = getVM(4);
+    VM vm5 = getVM(5);
+    Stream.of(vm0, vm1, vm2).forEach(vm -> vm.invoke(() -> setRedundancyZone("A")));
+    Stream.of(vm3, vm4, vm5).forEach(vm -> vm.invoke(() -> setRedundancyZone("B")));
+
+    Stream.of(vm0, vm1, vm2, vm3, vm4, vm5).forEach(vm -> {
+      vm.invoke(() -> createPartitionedRegion("region1", 1, 113));
+      vm.invoke(() -> createPartitionedRegion("region1Ancillary", 1, 113));
+
+    });
+
+    // Create some buckets
+    vm0.invoke(() -> {
+      Map<Integer, String> mapOfData = new HashMap<>();
+      for (int i = 0; i < 100000; i++) {
+        mapOfData.put(i, "A");
+      }
+      Region<Integer, String> region = getCache().getRegion("region1");
+      region.putAll(mapOfData);
+      Region<Integer, String> region2 = getCache().getRegion("region1Ancillary");
+      region2.putAll(mapOfData);
+    });
+
+    // make sure we can tell that the buckets have low redundancy
+    // vm0.invoke(() -> validateRedundancy("region1", 113, 0, 113));
+
+    // Make sure we still have low redundancy
+    vm0.invoke(() -> validateRedundancy("region1", 113, 1, 0));
+
+    vm0.invoke(() -> {
+      InternalResourceManager manager = getCache().getInternalResourceManager();
+      RebalanceResults results = doRebalance(false, manager);
+      logger.info("Rebalance 1 Results = " + results);
+      validateStatistics(manager, results);
+    });
+
+    vm2.bounceForcibly();
+
+    vm0.invoke(() -> {
+      InternalResourceManager manager = getCache().getInternalResourceManager();
+      RebalanceResults results = doRebalance(false, manager);
+      logger.info("Rebalance 2 Results = " + results);
+    });
+    // Thread.sleep(2000);
+
+    vm2.invoke(() -> {
+      setRedundancyZone("A");
+      createPartitionedRegion("region1", 1, 113);
+    });
+
+    vm0.invoke(() -> {
+      InternalResourceManager manager = getCache().getInternalResourceManager();
+      RebalanceResults results = doRebalance(false, manager);
+      logger.info("Rebalance 3 Results = " + results);
+    });
+
+    int zoneA = vm0.invoke(() -> getBucketCount("region1"));
+    zoneA += vm1.invoke(() -> getBucketCount("region1"));
+    zoneA += vm2.invoke(() -> getBucketCount("region1"));
+
+    int zoneB = vm3.invoke(() -> getBucketCount("region1"));
+    zoneB += vm4.invoke(() -> getBucketCount("region1"));
+    zoneB += vm5.invoke(() -> getBucketCount("region1"));
+
+    assertThat(zoneA).isEqualTo(zoneB).isEqualTo(113);
+    vm0.invoke(() -> validateBucketCountLessThan("region1", 38));
+    vm1.invoke(() -> validateBucketCountLessThan("region1", 38));
+    vm2.invoke(() -> validateBucketCountLessThan("region1", 38));
+    vm3.invoke(() -> validateBucketCountLessThan("region1", 38));
+    vm4.invoke(() -> validateBucketCountLessThan("region1", 38));
+    vm5.invoke(() -> validateBucketCountLessThan("region1", 38));
+  }
+
   @Test
   public void testEnforceZoneWithMultipleRegions() {
     VM vm0 = getVM(0);
@@ -2349,6 +2438,19 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
     regionFactory.create(regionName);
   }
 
+  private void createPartitionedRegion(String regionName, int redundantCopies, final int numBuckets,
+                                       final RegionShortcut regionShortcut) {
+    PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
+    partitionAttributesFactory.setRedundantCopies(redundantCopies);
+    partitionAttributesFactory.setRecoveryDelay(-1);
+    partitionAttributesFactory.setStartupRecoveryDelay(-1);
+    partitionAttributesFactory.setTotalNumBuckets(numBuckets);
+    RegionFactory regionFactory = getCache().createRegionFactory(regionShortcut);
+    regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
+
+    regionFactory.create(regionName);
+  }
+
   private void createPartitionedRegion(String regionName, int redundantCopies) {
     PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
     partitionAttributesFactory.setRedundantCopies(redundantCopies);
@@ -2648,12 +2750,23 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
     assertThat(details.getLowRedundancyBucketCount()).isEqualTo(expectedLowRedundancyBucketCount);
   }
 
+  private int getBucketCount(String regionName) {
+    PartitionedRegion region = (PartitionedRegion) getCache().getRegion(regionName);
+    return region.getLocalBucketsListTestOnly().size();
+  }
+
   private void validateBucketCount(String regionName, int numLocalBuckets) {
     PartitionedRegion region = (PartitionedRegion) getCache().getRegion(regionName);
 
     assertThat(region.getLocalBucketsListTestOnly()).hasSize(numLocalBuckets);
   }
 
+  private void validateBucketCountLessThan(String regionName, int numLocalBuckets) {
+    PartitionedRegion region = (PartitionedRegion) getCache().getRegion(regionName);
+
+    assertThat(region.getLocalBucketsListTestOnly().size()).isLessThanOrEqualTo(numLocalBuckets);
+  }
+
   private void validateStatistics(InternalResourceManager manager, RebalanceResults results) {
     ResourceManagerStats stats = manager.getStats();
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index d687d4c..c7f2ae1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -215,6 +215,30 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
             "RegionStats-partition-" + pr.getName(), pr.getCachePerfStats(), pr,
             pr.getCache().getMeterRegistry(), statisticsClock);
     this.keysOfInterest = new ConcurrentHashMap();
+    launchBucketIdsLogThread();
+  }
+
+  private void launchBucketIdsLogThread() {
+    new Thread(() -> logBucketIds()).start();
+  }
+
+  private void logBucketIds() {
+    List<Integer> previousBucketIdsList = Collections.emptyList(), currentBucketIdsList;
+    while (true) {
+      currentBucketIdsList = new ArrayList<>(getAllLocalBucketIds());
+      Collections.sort(currentBucketIdsList);
+      if (!currentBucketIdsList.equals(previousBucketIdsList)) {
+        String message = "region=" + getName() + "; bucketIdsSize=" + currentBucketIdsList.size()
+            + "; bucketIds=" + currentBucketIdsList;
+        System.out.println(message);
+        logger.warn("XXX " + message);
+        previousBucketIdsList = currentBucketIdsList;
+      }
+      try {
+        Thread.sleep(2000);
+      } catch (Exception e) {
+      }
+    }
   }
 
   /**
@@ -793,6 +817,11 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
               .setPartitionedRegion(this.partitionedRegion)
               .setIndexes(getIndexes(rootRegion.getFullPath(), bucketRegionName)));
       this.partitionedRegion.getPrStats().incBucketCount(1);
+      System.out.println(Thread.currentThread().getName()
+          + ": PartitionedRegionDataStore.createBucketRegion region=" + bucketRegion.getName());
+      logger.warn(
+          "XXX PartitionedRegionDataStore.createBucketRegion region=" + bucketRegion.getName());
+      // new Exception());
     } catch (RegionExistsException ex) {
       // Bucket Region is already created, so do nothing.
       if (logger.isDebugEnabled()) {
@@ -1644,6 +1673,10 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
       }
       this.localBucket2RegionMap.remove(Integer.valueOf(bucketId));
       this.partitionedRegion.getPrStats().incBucketCount(-1);
+      System.out.println(Thread.currentThread().getName()
+          + ": PartitionedRegionDataStore.removeBucket region=" + bucketRegion.getName());
+      logger.warn("XXX PartitionedRegionDataStore.removeBucket region=" + bucketRegion.getName());
+      // new Exception());
       return true;
     } finally {
       lock.unlock();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java
index 8a6c4f9..b125ae4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java
@@ -47,6 +47,27 @@ public class PartitionRebalanceDetailsImpl
   private final transient PartitionedRegion region;
   private long time;
 
+  @Override
+  public String toString() {
+    return "PartitionRebalanceDetailsImpl{" +
+        "bucketCreateBytes=" + bucketCreateBytes +
+        ", bucketCreateTime=" + bucketCreateTime +
+        ", bucketCreatesCompleted=" + bucketCreatesCompleted +
+        ", bucketRemoveBytes=" + bucketRemoveBytes +
+        ", bucketRemoveTime=" + bucketRemoveTime +
+        ", bucketRemovesCompleted=" + bucketRemovesCompleted +
+        ", bucketTransferBytes=" + bucketTransferBytes +
+        ", bucketTransferTime=" + bucketTransferTime +
+        ", bucketTransfersCompleted=" + bucketTransfersCompleted +
+        ", partitionMemberDetailsAfter=" + partitionMemberDetailsAfter +
+        ", partitionMemberDetailsBefore=" + partitionMemberDetailsBefore +
+        ", primaryTransferTime=" + primaryTransferTime +
+        ", primaryTransfersCompleted=" + primaryTransfersCompleted +
+        ", region=" + region +
+        ", time=" + time +
+        '}';
+  }
+
   public PartitionRebalanceDetailsImpl(PartitionedRegion region) {
     this.region = region;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
index e486e78..fc7b75b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
@@ -528,6 +528,10 @@ public class PartitionedRegionRebalanceOp {
       removed = getLeaderRegion().getDataStore().removeBucket(bucketId, false);
     } else {
       // send message to remote member...
+      String message = "XXX PartitionedRegionRebalanceOp.removeRedundantBucketForRegion target="
+          + target + "; region=" + getLeaderRegion() + "; bucketId=" + bucketId;
+      System.out.println(Thread.currentThread().getName() + ": " + message);
+      logger.warn(message, new Exception());
       RemoveBucketResponse response =
           RemoveBucketMessage.send(target, getLeaderRegion(), bucketId, false);
       if (response != null) {