You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mh...@apache.org on 2021/08/24 16:59:01 UTC
[geode] 01/02: first cut.
This is an automated email from the ASF dual-hosted git repository.
mhanson pushed a commit to branch rebalance_bug
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 120b6620d0cb1caaf9204be279d88fdc95a92a29
Author: Mark Hanson <ha...@vmware.com>
AuthorDate: Tue Aug 24 01:24:07 2021 -0700
first cut.
---
.../control/RebalanceOperationDistributedTest.java | 113 +++++++++++++++++++++
.../internal/cache/PartitionedRegionDataStore.java | 33 ++++++
.../control/PartitionRebalanceDetailsImpl.java | 21 ++++
.../partitioned/PartitionedRegionRebalanceOp.java | 4 +
4 files changed, 171 insertions(+)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
index 94bd61c..3a1bb5e 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationDistributedTest.java
@@ -54,6 +54,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
@@ -76,6 +77,7 @@ import org.apache.geode.cache.LoaderHelper;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.control.RebalanceOperation;
@@ -108,6 +110,7 @@ import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.cache.CacheTestCase;
+import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
import org.apache.geode.util.internal.GeodeGlossary;
@@ -131,6 +134,9 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
@Rule
public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+ @Rule
+ public DistributedDiskDirRule diskDirRule = new DistributedDiskDirRule();
+
@After
public void tearDown() {
invokeInEveryVM(() -> {
@@ -368,6 +374,89 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
}
}
+ /**
+ * Test that we correctly use the redundancy-zone property to determine where to place redundant
+ * copies of a buckets.
+ */
+ @Test
+ public void testEnforceZoneWithSixServersAndTwoZones() throws InterruptedException {
+ VM vm0 = getVM(0);
+ VM vm1 = getVM(1);
+ VM vm2 = getVM(2);
+ VM vm3 = getVM(3);
+ VM vm4 = getVM(4);
+ VM vm5 = getVM(5);
+ Stream.of(vm0, vm1, vm2).forEach(vm -> vm.invoke(() -> setRedundancyZone("A")));
+ Stream.of(vm3, vm4, vm5).forEach(vm -> vm.invoke(() -> setRedundancyZone("B")));
+
+ Stream.of(vm0, vm1, vm2, vm3, vm4, vm5).forEach(vm -> {
+ vm.invoke(() -> createPartitionedRegion("region1", 1, 113));
+ vm.invoke(() -> createPartitionedRegion("region1Ancillary", 1, 113));
+
+ });
+
+ // Create some buckets
+ vm0.invoke(() -> {
+ Map<Integer, String> mapOfData = new HashMap<>();
+ for (int i = 0; i < 100000; i++) {
+ mapOfData.put(i, "A");
+ }
+ Region<Integer, String> region = getCache().getRegion("region1");
+ region.putAll(mapOfData);
+ Region<Integer, String> region2 = getCache().getRegion("region1Ancillary");
+ region2.putAll(mapOfData);
+ });
+
+ // make sure we can tell that the buckets have low redundancy
+ // vm0.invoke(() -> validateRedundancy("region1", 113, 0, 113));
+
+ // Make sure we still have low redundancy
+ vm0.invoke(() -> validateRedundancy("region1", 113, 1, 0));
+
+ vm0.invoke(() -> {
+ InternalResourceManager manager = getCache().getInternalResourceManager();
+ RebalanceResults results = doRebalance(false, manager);
+ logger.info("Rebalance 1 Results = " + results);
+ validateStatistics(manager, results);
+ });
+
+ vm2.bounceForcibly();
+
+ vm0.invoke(() -> {
+ InternalResourceManager manager = getCache().getInternalResourceManager();
+ RebalanceResults results = doRebalance(false, manager);
+ logger.info("Rebalance 2 Results = " + results);
+ });
+ // Thread.sleep(2000);
+
+ vm2.invoke(() -> {
+ setRedundancyZone("A");
+ createPartitionedRegion("region1", 1, 113);
+ });
+
+ vm0.invoke(() -> {
+ InternalResourceManager manager = getCache().getInternalResourceManager();
+ RebalanceResults results = doRebalance(false, manager);
+ logger.info("Rebalance 3 Results = " + results);
+ });
+
+ int zoneA = vm0.invoke(() -> getBucketCount("region1"));
+ zoneA += vm1.invoke(() -> getBucketCount("region1"));
+ zoneA += vm2.invoke(() -> getBucketCount("region1"));
+
+ int zoneB = vm3.invoke(() -> getBucketCount("region1"));
+ zoneB += vm4.invoke(() -> getBucketCount("region1"));
+ zoneB += vm5.invoke(() -> getBucketCount("region1"));
+
+ assertThat(zoneA).isEqualTo(zoneB).isEqualTo(113);
+ vm0.invoke(() -> validateBucketCountLessThan("region1", 38));
+ vm1.invoke(() -> validateBucketCountLessThan("region1", 38));
+ vm2.invoke(() -> validateBucketCountLessThan("region1", 38));
+ vm3.invoke(() -> validateBucketCountLessThan("region1", 38));
+ vm4.invoke(() -> validateBucketCountLessThan("region1", 38));
+ vm5.invoke(() -> validateBucketCountLessThan("region1", 38));
+ }
+
@Test
public void testEnforceZoneWithMultipleRegions() {
VM vm0 = getVM(0);
@@ -2349,6 +2438,19 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
regionFactory.create(regionName);
}
+ private void createPartitionedRegion(String regionName, int redundantCopies, final int numBuckets,
+ final RegionShortcut regionShortcut) {
+ PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
+ partitionAttributesFactory.setRedundantCopies(redundantCopies);
+ partitionAttributesFactory.setRecoveryDelay(-1);
+ partitionAttributesFactory.setStartupRecoveryDelay(-1);
+ partitionAttributesFactory.setTotalNumBuckets(numBuckets);
+ RegionFactory regionFactory = getCache().createRegionFactory(regionShortcut);
+ regionFactory.setPartitionAttributes(partitionAttributesFactory.create());
+
+ regionFactory.create(regionName);
+ }
+
private void createPartitionedRegion(String regionName, int redundantCopies) {
PartitionAttributesFactory partitionAttributesFactory = new PartitionAttributesFactory();
partitionAttributesFactory.setRedundantCopies(redundantCopies);
@@ -2648,12 +2750,23 @@ public class RebalanceOperationDistributedTest extends CacheTestCase {
assertThat(details.getLowRedundancyBucketCount()).isEqualTo(expectedLowRedundancyBucketCount);
}
+ private int getBucketCount(String regionName) {
+ PartitionedRegion region = (PartitionedRegion) getCache().getRegion(regionName);
+ return region.getLocalBucketsListTestOnly().size();
+ }
+
private void validateBucketCount(String regionName, int numLocalBuckets) {
PartitionedRegion region = (PartitionedRegion) getCache().getRegion(regionName);
assertThat(region.getLocalBucketsListTestOnly()).hasSize(numLocalBuckets);
}
+ private void validateBucketCountLessThan(String regionName, int numLocalBuckets) {
+ PartitionedRegion region = (PartitionedRegion) getCache().getRegion(regionName);
+
+ assertThat(region.getLocalBucketsListTestOnly().size()).isLessThanOrEqualTo(numLocalBuckets);
+ }
+
private void validateStatistics(InternalResourceManager manager, RebalanceResults results) {
ResourceManagerStats stats = manager.getStats();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index d687d4c..c7f2ae1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -215,6 +215,30 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
"RegionStats-partition-" + pr.getName(), pr.getCachePerfStats(), pr,
pr.getCache().getMeterRegistry(), statisticsClock);
this.keysOfInterest = new ConcurrentHashMap();
+ launchBucketIdsLogThread();
+ }
+
+ private void launchBucketIdsLogThread() {
+ new Thread(() -> logBucketIds()).start();
+ }
+
+ private void logBucketIds() {
+ List<Integer> previousBucketIdsList = Collections.emptyList(), currentBucketIdsList;
+ while (true) {
+ currentBucketIdsList = new ArrayList<>(getAllLocalBucketIds());
+ Collections.sort(currentBucketIdsList);
+ if (!currentBucketIdsList.equals(previousBucketIdsList)) {
+ String message = "region=" + getName() + "; bucketIdsSize=" + currentBucketIdsList.size()
+ + "; bucketIds=" + currentBucketIdsList;
+ System.out.println(message);
+ logger.warn("XXX " + message);
+ previousBucketIdsList = currentBucketIdsList;
+ }
+ try {
+ Thread.sleep(2000);
+ } catch (Exception e) {
+ }
+ }
}
/**
@@ -793,6 +817,11 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
.setPartitionedRegion(this.partitionedRegion)
.setIndexes(getIndexes(rootRegion.getFullPath(), bucketRegionName)));
this.partitionedRegion.getPrStats().incBucketCount(1);
+ System.out.println(Thread.currentThread().getName()
+ + ": PartitionedRegionDataStore.createBucketRegion region=" + bucketRegion.getName());
+ logger.warn(
+ "XXX PartitionedRegionDataStore.createBucketRegion region=" + bucketRegion.getName());
+ // new Exception());
} catch (RegionExistsException ex) {
// Bucket Region is already created, so do nothing.
if (logger.isDebugEnabled()) {
@@ -1644,6 +1673,10 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
}
this.localBucket2RegionMap.remove(Integer.valueOf(bucketId));
this.partitionedRegion.getPrStats().incBucketCount(-1);
+ System.out.println(Thread.currentThread().getName()
+ + ": PartitionedRegionDataStore.removeBucket region=" + bucketRegion.getName());
+ logger.warn("XXX PartitionedRegionDataStore.removeBucket region=" + bucketRegion.getName());
+ // new Exception());
return true;
} finally {
lock.unlock();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java
index 8a6c4f9..b125ae4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java
@@ -47,6 +47,27 @@ public class PartitionRebalanceDetailsImpl
private final transient PartitionedRegion region;
private long time;
+ @Override
+ public String toString() {
+ return "PartitionRebalanceDetailsImpl{" +
+ "bucketCreateBytes=" + bucketCreateBytes +
+ ", bucketCreateTime=" + bucketCreateTime +
+ ", bucketCreatesCompleted=" + bucketCreatesCompleted +
+ ", bucketRemoveBytes=" + bucketRemoveBytes +
+ ", bucketRemoveTime=" + bucketRemoveTime +
+ ", bucketRemovesCompleted=" + bucketRemovesCompleted +
+ ", bucketTransferBytes=" + bucketTransferBytes +
+ ", bucketTransferTime=" + bucketTransferTime +
+ ", bucketTransfersCompleted=" + bucketTransfersCompleted +
+ ", partitionMemberDetailsAfter=" + partitionMemberDetailsAfter +
+ ", partitionMemberDetailsBefore=" + partitionMemberDetailsBefore +
+ ", primaryTransferTime=" + primaryTransferTime +
+ ", primaryTransfersCompleted=" + primaryTransfersCompleted +
+ ", region=" + region +
+ ", time=" + time +
+ '}';
+ }
+
public PartitionRebalanceDetailsImpl(PartitionedRegion region) {
this.region = region;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
index e486e78..fc7b75b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
@@ -528,6 +528,10 @@ public class PartitionedRegionRebalanceOp {
removed = getLeaderRegion().getDataStore().removeBucket(bucketId, false);
} else {
// send message to remote member...
+ String message = "XXX PartitionedRegionRebalanceOp.removeRedundantBucketForRegion target="
+ + target + "; region=" + getLeaderRegion() + "; bucketId=" + bucketId;
+ System.out.println(Thread.currentThread().getName() + ": " + message);
+ logger.warn(message, new Exception());
RemoveBucketResponse response =
RemoveBucketMessage.send(target, getLeaderRegion(), bucketId, false);
if (response != null) {