You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by sa...@apache.org on 2016/04/01 03:19:56 UTC
incubator-geode git commit: Fixed BucketOperatorWrapper to invoke
onFailure if bucket creation fails
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-1153 [created] b35af269c
Fixed BucketOperatorWrapper to invoke onFailure if bucket creation fails
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b35af269
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b35af269
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b35af269
Branch: refs/heads/feature/GEODE-1153
Commit: b35af269c1232a5f509cb9dfcfde1dd7ebe5b89c
Parents: 8503fc3
Author: Sai Boorlagadda <sb...@pivotal.io>
Authored: Thu Mar 31 15:37:25 2016 -0700
Committer: Sai Boorlagadda <sb...@pivotal.io>
Committed: Thu Mar 31 15:50:38 2016 -0700
----------------------------------------------------------------------
.../PartitionedRegionRebalanceOp.java | 264 +------------------
.../rebalance/BucketOperatorImpl.java | 69 +++++
.../rebalance/BucketOperatorWrapper.java | 209 +++++++++++++++
.../control/RebalanceOperationDUnitTest.java | 167 +++++++++++-
.../rebalance/BucketOperatorImplTest.java | 145 ++++++++++
.../rebalance/BucketOperatorWrapperTest.java | 97 +++++++
6 files changed, 689 insertions(+), 262 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b35af269/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
index 8642876..f36c74f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
@@ -47,6 +47,8 @@ import com.gemstone.gemfire.internal.cache.partitioned.BecomePrimaryBucketMessag
import com.gemstone.gemfire.internal.cache.partitioned.MoveBucketMessage.MoveBucketResponse;
import com.gemstone.gemfire.internal.cache.partitioned.RemoveBucketMessage.RemoveBucketResponse;
import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator;
+import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorImpl;
+import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorWrapper;
import com.gemstone.gemfire.internal.cache.partitioned.rebalance.ParallelBucketOperator;
import com.gemstone.gemfire.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel;
import com.gemstone.gemfire.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.AddressComparor;
@@ -413,9 +415,9 @@ public class PartitionedRegionRebalanceOp {
}
BucketOperator operator = simulate ?
new SimulatedBucketOperator()
- : new BucketOperatorImpl();
+ : new BucketOperatorImpl(leaderRegion, isRebalance, replaceOfflineData);
BucketOperatorWrapper wrapper = new BucketOperatorWrapper(
- operator, rebalanceDetails);
+ operator, rebalanceDetails, stats, leaderRegion);
return wrapper;
}
@@ -650,262 +652,4 @@ public class PartitionedRegionRebalanceOp {
public void quorumLost(Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {
}
}
-
- private class BucketOperatorImpl implements BucketOperator {
-
- @Override
- public boolean moveBucket(InternalDistributedMember source,
- InternalDistributedMember target, int bucketId,
- Map<String, Long> colocatedRegionBytes) {
-
- InternalResourceManager.getResourceObserver().movingBucket(
- leaderRegion, bucketId, source, target);
- return moveBucketForRegion(source, target, bucketId, leaderRegion);
- }
-
- @Override
- public boolean movePrimary(InternalDistributedMember source,
- InternalDistributedMember target, int bucketId) {
-
- InternalResourceManager.getResourceObserver().movingPrimary(
- leaderRegion, bucketId, source, target);
- return movePrimaryBucketForRegion(target, bucketId, leaderRegion, isRebalance);
- }
-
- @Override
- public void createRedundantBucket(
- InternalDistributedMember targetMember, int bucketId,
- Map<String, Long> colocatedRegionBytes, Completion completion) {
- boolean result = false;
- try {
- result = createRedundantBucketForRegion(targetMember, bucketId,
- leaderRegion, isRebalance,replaceOfflineData);
- } finally {
- if(result) {
- completion.onSuccess();
- } else {
- completion.onFailure();
- }
- }
- }
-
- @Override
- public void waitForOperations() {
- //do nothing, all operations are synchronous
- }
-
- @Override
- public boolean removeBucket(InternalDistributedMember targetMember, int bucketId,
- Map<String, Long> colocatedRegionBytes) {
- return removeRedundantBucketForRegion(targetMember, bucketId,
- leaderRegion);
- }
- }
-
- /**
- * A wrapper class which delegates actual bucket operations to the enclosed BucketOperator,
- * but keeps track of statistics about how many buckets are created, transfered, etc.
- *
- */
- private class BucketOperatorWrapper implements
- BucketOperator {
- private final BucketOperator delegate;
- private final Set<PartitionRebalanceDetailsImpl> detailSet;
- private final int regionCount;
-
- public BucketOperatorWrapper(
- BucketOperator delegate,
- Set<PartitionRebalanceDetailsImpl> rebalanceDetails) {
- this.delegate = delegate;
- this.detailSet = rebalanceDetails;
- this.regionCount = detailSet.size();
- }
- @Override
- public boolean moveBucket(InternalDistributedMember sourceMember,
- InternalDistributedMember targetMember, int id,
- Map<String, Long> colocatedRegionBytes) {
- long start = System.nanoTime();
- boolean result = false;
- long elapsed = 0;
- long totalBytes = 0;
-
-
- if (stats != null) {
- stats.startBucketTransfer(regionCount);
- }
- try {
- result = delegate.moveBucket(sourceMember, targetMember, id,
- colocatedRegionBytes);
- elapsed = System.nanoTime() - start;
- if (result) {
- if (logger.isDebugEnabled()) {
- logger.debug("Rebalancing {} bucket {} moved from {} to {}", leaderRegion, id, sourceMember, targetMember);
- }
- for (PartitionRebalanceDetailsImpl details : detailSet) {
- String regionPath = details.getRegionPath();
- Long regionBytes = colocatedRegionBytes.get(regionPath);
- if(regionBytes != null) {
- //only increment the elapsed time for the leader region
- details.incTransfers(regionBytes.longValue(),
- details.getRegion().equals(leaderRegion) ? elapsed : 0);
- totalBytes += regionBytes.longValue();
- }
- }
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("Rebalancing {} bucket {} moved failed from {} to {}", leaderRegion, id, sourceMember, targetMember);
- }
- }
- } finally {
- if(stats != null) {
- stats.endBucketTransfer(regionCount, result, totalBytes, elapsed);
- }
- }
-
- return result;
- }
-
- @Override
- public void createRedundantBucket(
- final InternalDistributedMember targetMember, final int i,
- final Map<String, Long> colocatedRegionBytes, final Completion completion) {
-
- if(stats != null) {
- stats.startBucketCreate(regionCount);
- }
-
- final long start = System.nanoTime();
- delegate.createRedundantBucket(targetMember, i,
- colocatedRegionBytes, new Completion() {
-
- @Override
- public void onSuccess() {
- long totalBytes = 0;
- long elapsed= System.nanoTime() - start;
- if(logger.isDebugEnabled()) {
- logger.debug("Rebalancing {} redundant bucket {} created on {}", leaderRegion, i, targetMember);
- }
- for (PartitionRebalanceDetailsImpl details : detailSet) {
- String regionPath = details.getRegionPath();
- Long lrb = colocatedRegionBytes.get(regionPath);
- if (lrb != null) { // region could have gone away - esp during shutdow
- long regionBytes = lrb.longValue();
- //Only add the elapsed time to the leader region.
- details.incCreates(regionBytes,
- details.getRegion().equals(leaderRegion) ? elapsed : 0);
- totalBytes += regionBytes;
- }
- }
-
- if(stats != null) {
- stats.endBucketCreate(regionCount, true, totalBytes, elapsed);
- }
-
- }
-
- @Override
- public void onFailure() {
- long elapsed= System.nanoTime() - start;
-
- if (logger.isDebugEnabled()) {
- logger.debug("Rebalancing {} redundant bucket {} failed creation on {}", leaderRegion, i, targetMember);
- }
-
- if(stats != null) {
- stats.endBucketCreate(regionCount, false, 0, elapsed);
- }
- }
- });
- }
-
- @Override
- public boolean removeBucket(
- InternalDistributedMember targetMember, int i,
- Map<String, Long> colocatedRegionBytes) {
- boolean result = false;
- long elapsed = 0;
- long totalBytes = 0;
-
-
- if(stats != null) {
- stats.startBucketRemove(regionCount);
- }
- try {
- long start = System.nanoTime();
- result = delegate.removeBucket(targetMember, i,
- colocatedRegionBytes);
- elapsed= System.nanoTime() - start;
- if (result) {
- if(logger.isDebugEnabled()) {
- logger.debug("Rebalancing {} redundant bucket {} removed from {}", leaderRegion, i, targetMember);
- }
- for (PartitionRebalanceDetailsImpl details : detailSet) {
- String regionPath = details.getRegionPath();
- Long lrb = colocatedRegionBytes.get(regionPath);
- if (lrb != null) { // region could have gone away - esp during shutdow
- long regionBytes = lrb.longValue();
- //Only add the elapsed time to the leader region.
- details.incRemoves(regionBytes,
- details.getRegion().equals(leaderRegion) ? elapsed : 0);
- totalBytes += regionBytes;
- }
- }
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("Rebalancing {} redundant bucket {} failed removal o{}", leaderRegion, i, targetMember);
- }
- }
- } finally {
- if(stats != null) {
- stats.endBucketRemove(regionCount, result, totalBytes, elapsed);
- }
- }
-
- return result;
- }
-
- @Override
- public boolean movePrimary(InternalDistributedMember source,
- InternalDistributedMember target, int bucketId) {
- boolean result = false;
- long elapsed = 0;
-
- if(stats != null) {
- stats.startPrimaryTransfer(regionCount);
- }
-
- try {
- long start = System.nanoTime();
- result = delegate.movePrimary(source, target, bucketId);
- elapsed = System.nanoTime() - start;
- if (result) {
- if (logger.isDebugEnabled()) {
- logger.debug("Rebalancing {} primary bucket {} moved from {} to {}", leaderRegion, bucketId, source, target);
- }
- for (PartitionRebalanceDetailsImpl details : detailSet) {
- details.incPrimaryTransfers(details.getRegion().equals(leaderRegion) ? elapsed : 0);
- }
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("Rebalancing {} primary bucket {} failed to move from {} to {}", leaderRegion, bucketId, source, target);
- }
- }
- } finally {
- if(stats != null) {
- stats.endPrimaryTransfer(regionCount, result, elapsed);
- }
- }
-
- return result;
- }
-
- @Override
- public void waitForOperations() {
- delegate.waitForOperations();
- }
-
- public Set<PartitionRebalanceDetailsImpl> getDetailSet() {
- return this.detailSet;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b35af269/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java
new file mode 100644
index 0000000..b7c172b
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java
@@ -0,0 +1,69 @@
+package com.gemstone.gemfire.internal.cache.partitioned.rebalance;
+
+import java.util.Map;
+
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
+import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionRebalanceOp;
+
+public class BucketOperatorImpl implements BucketOperator {
+
+ private PartitionedRegion leaderRegion;
+ private boolean isRebalance;
+ private boolean replaceOfflineData;
+
+ public BucketOperatorImpl(PartitionedRegion leaderRegion, boolean isRebalance, boolean replaceOfflineData) {
+ this.leaderRegion = leaderRegion;
+ this.isRebalance = isRebalance;
+ this.replaceOfflineData = replaceOfflineData;
+ }
+
+ @Override
+ public boolean moveBucket(InternalDistributedMember source,
+ InternalDistributedMember target, int bucketId,
+ Map<String, Long> colocatedRegionBytes) {
+
+ InternalResourceManager.getResourceObserver().movingBucket(
+ leaderRegion, bucketId, source, target);
+ return PartitionedRegionRebalanceOp.moveBucketForRegion(source, target, bucketId, leaderRegion);
+ }
+
+ @Override
+ public boolean movePrimary(InternalDistributedMember source,
+ InternalDistributedMember target, int bucketId) {
+
+ InternalResourceManager.getResourceObserver().movingPrimary(
+ leaderRegion, bucketId, source, target);
+ return PartitionedRegionRebalanceOp.movePrimaryBucketForRegion(target, bucketId, leaderRegion, isRebalance);
+ }
+
+ @Override
+ public void createRedundantBucket(
+ InternalDistributedMember targetMember, int bucketId,
+ Map<String, Long> colocatedRegionBytes, Completion completion) {
+ boolean result = false;
+ try {
+ result = PartitionedRegionRebalanceOp.createRedundantBucketForRegion(targetMember, bucketId,
+ leaderRegion, isRebalance, replaceOfflineData);
+ } finally {
+ if(result) {
+ completion.onSuccess();
+ } else {
+ completion.onFailure();
+ }
+ }
+ }
+
+ @Override
+ public void waitForOperations() {
+ //do nothing, all operations are synchronous
+ }
+
+ @Override
+ public boolean removeBucket(InternalDistributedMember targetMember, int bucketId,
+ Map<String, Long> colocatedRegionBytes) {
+ return PartitionedRegionRebalanceOp.removeRedundantBucketForRegion(targetMember, bucketId,
+ leaderRegion);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b35af269/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
new file mode 100644
index 0000000..0d0edd2
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
@@ -0,0 +1,209 @@
+package com.gemstone.gemfire.internal.cache.partitioned.rebalance;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.control.PartitionRebalanceDetailsImpl;
+import com.gemstone.gemfire.internal.cache.control.ResourceManagerStats;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class BucketOperatorWrapper implements BucketOperator {
+ private static final Logger logger = LogService.getLogger();
+
+ private final BucketOperator delegate;
+ private final Set<PartitionRebalanceDetailsImpl> detailSet;
+ private final int regionCount;
+ private final ResourceManagerStats stats;
+ private final PartitionedRegion leaderRegion;
+
+ public BucketOperatorWrapper(BucketOperator delegate, Set<PartitionRebalanceDetailsImpl> rebalanceDetails, ResourceManagerStats stats, PartitionedRegion leaderRegion) {
+ this.delegate = delegate;
+ this.detailSet = rebalanceDetails;
+ this.regionCount = detailSet.size();
+ this.stats = stats;
+ this.leaderRegion = leaderRegion;
+ }
+
+ @Override
+ public boolean moveBucket(InternalDistributedMember sourceMember, InternalDistributedMember targetMember, int id, Map<String, Long> colocatedRegionBytes) {
+ long start = System.nanoTime();
+ boolean result = false;
+ long elapsed = 0;
+ long totalBytes = 0;
+
+ if (stats != null) {
+ stats.startBucketTransfer(regionCount);
+ }
+ try {
+ result = delegate.moveBucket(sourceMember, targetMember, id, colocatedRegionBytes);
+ elapsed = System.nanoTime() - start;
+ if (result) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebalancing {} bucket {} moved from {} to {}", leaderRegion, id, sourceMember, targetMember);
+ }
+ for (PartitionRebalanceDetailsImpl details : detailSet) {
+ String regionPath = details.getRegionPath();
+ Long regionBytes = colocatedRegionBytes.get(regionPath);
+ if (regionBytes != null) {
+ // only increment the elapsed time for the leader region
+ details.incTransfers(regionBytes.longValue(), details.getRegion().equals(leaderRegion) ? elapsed : 0);
+ totalBytes += regionBytes.longValue();
+ }
+ }
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebalancing {} bucket {} moved failed from {} to {}", leaderRegion, id, sourceMember, targetMember);
+ }
+ }
+ } finally {
+ if (stats != null) {
+ stats.endBucketTransfer(regionCount, result, totalBytes, elapsed);
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public void createRedundantBucket(final InternalDistributedMember targetMember, final int i, final Map<String, Long> colocatedRegionBytes,
+ final Completion completion) {
+
+ if (stats != null) {
+ stats.startBucketCreate(regionCount);
+ }
+
+ final long start = System.nanoTime();
+ delegate.createRedundantBucket(targetMember, i, colocatedRegionBytes, new Completion() {
+
+ @Override
+ public void onSuccess() {
+ long totalBytes = 0;
+ long elapsed = System.nanoTime() - start;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebalancing {} redundant bucket {} created on {}", leaderRegion, i, targetMember);
+ }
+ for (PartitionRebalanceDetailsImpl details : detailSet) {
+ String regionPath = details.getRegionPath();
+ Long lrb = colocatedRegionBytes.get(regionPath);
+ if (lrb != null) { // region could have gone away - esp during shutdow
+ long regionBytes = lrb.longValue();
+ // Only add the elapsed time to the leader region.
+ details.incCreates(regionBytes, details.getRegion().equals(leaderRegion) ? elapsed : 0);
+ totalBytes += regionBytes;
+ }
+ }
+
+ if (stats != null) {
+ stats.endBucketCreate(regionCount, true, totalBytes, elapsed);
+ }
+
+ //invoke onSuccess on the received completion callback
+ completion.onSuccess();
+ }
+
+ @Override
+ public void onFailure() {
+ long elapsed = System.nanoTime() - start;
+
+ //if (logger.isDebugEnabled()) {
+ logger.info("Rebalancing {} redundant bucket {} failed creation on {}", leaderRegion, i, targetMember);
+ //}
+
+ if (stats != null) {
+ stats.endBucketCreate(regionCount, false, 0, elapsed);
+ }
+
+ //invoke onFailure on the received completion callback
+ completion.onFailure();
+ }
+ });
+ }
+
+ @Override
+ public boolean removeBucket(InternalDistributedMember targetMember, int i, Map<String, Long> colocatedRegionBytes) {
+ boolean result = false;
+ long elapsed = 0;
+ long totalBytes = 0;
+
+ if (stats != null) {
+ stats.startBucketRemove(regionCount);
+ }
+ try {
+ long start = System.nanoTime();
+ result = delegate.removeBucket(targetMember, i, colocatedRegionBytes);
+ elapsed = System.nanoTime() - start;
+ if (result) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebalancing {} redundant bucket {} removed from {}", leaderRegion, i, targetMember);
+ }
+ for (PartitionRebalanceDetailsImpl details : detailSet) {
+ String regionPath = details.getRegionPath();
+ Long lrb = colocatedRegionBytes.get(regionPath);
+ if (lrb != null) { // region could have gone away - esp during shutdow
+ long regionBytes = lrb.longValue();
+ // Only add the elapsed time to the leader region.
+ details.incRemoves(regionBytes, details.getRegion().equals(leaderRegion) ? elapsed : 0);
+ totalBytes += regionBytes;
+ }
+ }
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebalancing {} redundant bucket {} failed removal o{}", leaderRegion, i, targetMember);
+ }
+ }
+ } finally {
+ if (stats != null) {
+ stats.endBucketRemove(regionCount, result, totalBytes, elapsed);
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public boolean movePrimary(InternalDistributedMember source, InternalDistributedMember target, int bucketId) {
+ boolean result = false;
+ long elapsed = 0;
+
+ if (stats != null) {
+ stats.startPrimaryTransfer(regionCount);
+ }
+
+ try {
+ long start = System.nanoTime();
+ result = delegate.movePrimary(source, target, bucketId);
+ elapsed = System.nanoTime() - start;
+ if (result) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebalancing {} primary bucket {} moved from {} to {}", leaderRegion, bucketId, source, target);
+ }
+ for (PartitionRebalanceDetailsImpl details : detailSet) {
+ details.incPrimaryTransfers(details.getRegion().equals(leaderRegion) ? elapsed : 0);
+ }
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebalancing {} primary bucket {} failed to move from {} to {}", leaderRegion, bucketId, source, target);
+ }
+ }
+ } finally {
+ if (stats != null) {
+ stats.endPrimaryTransfer(regionCount, result, elapsed);
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public void waitForOperations() {
+ delegate.waitForOperations();
+ }
+
+ public Set<PartitionRebalanceDetailsImpl> getDetailSet() {
+ return this.detailSet;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b35af269/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
index 26ebc16..4915037 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
@@ -16,6 +16,13 @@
*/
package com.gemstone.gemfire.internal.cache.control;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -43,7 +50,6 @@ import com.gemstone.gemfire.cache.DiskStoreFactory;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.EvictionAction;
import com.gemstone.gemfire.cache.EvictionAttributes;
-import com.gemstone.gemfire.cache.GemFireCache;
import com.gemstone.gemfire.cache.LoaderHelper;
import com.gemstone.gemfire.cache.PartitionAttributes;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
@@ -63,11 +69,12 @@ import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.cache.BucketRegion;
import com.gemstone.gemfire.internal.cache.ColocationHelper;
import com.gemstone.gemfire.internal.cache.DiskStoreImpl;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.InternalCache;
+import com.gemstone.gemfire.internal.cache.PRHARedundancyProvider;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceObserverAdapter;
@@ -972,6 +979,162 @@ public class RebalanceOperationDUnitTest extends CacheTestCase {
return (DistributedMember) vm.invoke(createPrRegion);
}
+ public void testRecoverRedundancyBalancingIfCreateBucketFails() {
+ recoverRedundancyBalancingIfCreateBucketFails(false);
+ }
+
+ public void recoverRedundancyBalancingIfCreateBucketFails(boolean simulate) {
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+
+ final DistributedMember member1 = createPrRegion(vm0, "region1", 100, null);
+
+ vm0.invoke(new SerializableRunnable("createSomeBuckets") {
+
+ public void run() {
+ Cache cache = getCache();
+ Region region = cache.getRegion("region1");
+ for(int i = 0; i < 1; i++) {
+ region.put(Integer.valueOf(i), "A");
+ }
+ }
+ });
+
+
+ SerializableRunnable checkRedundancy= new SerializableRunnable("checkRedundancy") {
+
+ public void run() {
+ Cache cache = getCache();
+ Region region = cache.getRegion("region1");
+ PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
+ assertEquals(1, details.getCreatedBucketCount());
+ assertEquals(0,details.getActualRedundantCopies());
+ assertEquals(1,details.getLowRedundancyBucketCount());
+ }
+ };
+
+ vm0.invoke(checkRedundancy);
+
+ //Now create the region in 2 more VMs
+ final DistributedMember member2 = createPrRegion(vm1, "region1", 100, null);
+ final DistributedMember member3 = createPrRegion(vm2, "region1", 100, null);
+
+ vm0.invoke(checkRedundancy);
+
+ //Inject mock PRHARedundancyProvider to simulate createBucketFailures
+ vm0.invoke(new SerializableRunnable("inject failure") {
+
+ @Override
+ public void run() {
+ GemFireCacheImpl cache = spy(getGemfireCache());
+ //set the spied cache instance
+ GemFireCacheImpl.setInstanceForTests(cache);
+
+ PartitionedRegion realRegion = (PartitionedRegion) cache.getRegion("region1");
+ PartitionedRegion spyRegion = spy(realRegion);
+ PRHARedundancyProvider redundancyProvider = spy(new PRHARedundancyProvider(spyRegion));
+
+ Set<PartitionedRegion> parRegions = cache.getPartitionedRegions();
+ parRegions.remove(realRegion);
+ parRegions.add(spyRegion);
+
+ doReturn(spyRegion).when(cache).getRegion("region1");
+ doReturn(parRegions).when(cache).getPartitionedRegions();
+
+ doReturn(redundancyProvider).when(spyRegion).getRedundancyProvider();
+
+ //simulate create bucket fails on member2
+ doReturn(false).when(redundancyProvider).createBackupBucketOnMember(anyInt(), eq((InternalDistributedMember) member2), anyBoolean(), anyBoolean(), any(), anyBoolean());
+
+ }
+ });
+
+ //Now simulate a rebalance
+ vm0.invoke(new SerializableRunnable("rebalance") {
+
+ public void run() {
+ //Retrieve spied cache from GemFireCacheImpl and not using getCache()
+ GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ RegionFilter filter = new FilterByPath(null, null);
+ RebalanceOperationImpl factory = new RebalanceOperationImpl(cache, false, filter);
+ factory.start();
+ RebalanceResults results = null;
+ try {
+ results = factory.getResults(MAX_WAIT, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Assert.fail("Interrupted waiting on rebalance", e);
+ } catch (TimeoutException e) {
+ Assert.fail("Timeout waiting on rebalance", e);
+ }
+ assertEquals(1, results.getTotalBucketCreatesCompleted());
+ assertEquals(0, results.getTotalPrimaryTransfersCompleted());
+ assertEquals(0, results.getTotalBucketTransferBytes());
+ assertEquals(0, results.getTotalBucketTransfersCompleted());
+ Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
+ assertEquals(1, detailSet.size());
+ PartitionRebalanceInfo details = detailSet.iterator().next();
+ assertEquals(1, details.getBucketCreatesCompleted());
+ assertEquals(0, details.getPrimaryTransfersCompleted());
+ assertEquals(0, details.getBucketTransferBytes());
+ assertEquals(0, details.getBucketTransfersCompleted());
+
+ Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
+ assertEquals(3, afterDetails.size());
+ for(PartitionMemberInfo memberDetails: afterDetails) {
+ if(memberDetails.getDistributedMember().equals(member1)) {
+ assertEquals(1, memberDetails.getBucketCount());
+ assertEquals(1, memberDetails.getPrimaryCount());
+ } else if(memberDetails.getDistributedMember().equals(member2)) {
+ assertEquals(0, memberDetails.getBucketCount());
+ assertEquals(0, memberDetails.getPrimaryCount());
+ } else if(memberDetails.getDistributedMember().equals(member3)) {
+ assertEquals(1, memberDetails.getBucketCount());
+ assertEquals(0, memberDetails.getPrimaryCount());
+ }
+ }
+
+ if(!simulate) {
+ ResourceManagerStats stats = cache.getResourceManager().getStats();
+
+ assertEquals(0, stats.getRebalancesInProgress());
+ assertEquals(1, stats.getRebalancesCompleted());
+ assertEquals(0, stats.getRebalanceBucketCreatesInProgress());
+ assertEquals(results.getTotalBucketCreatesCompleted(), stats.getRebalanceBucketCreatesCompleted());
+ assertEquals(1, stats.getRebalanceBucketCreatesFailed());
+ }
+ }
+ });
+
+ if(!simulate) {
+ SerializableRunnable checkRedundancyFixed = new SerializableRunnable("checkLowRedundancy") {
+
+ public void run() {
+ Cache cache = getCache();
+ Region region = cache.getRegion("region1");
+ PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
+ assertEquals(1, details.getCreatedBucketCount());
+ assertEquals(1,details.getActualRedundantCopies());
+ assertEquals(0,details.getLowRedundancyBucketCount());
+ }
+ };
+
+ vm0.invoke(checkRedundancyFixed);
+ vm1.invoke(checkRedundancyFixed);
+ vm2.invoke(checkRedundancyFixed);
+ }
+
+ //reset the cache
+ vm0.invoke(new SerializableRunnable() {
+ @Override
+ public void run() throws Exception {
+ GemFireCacheImpl.setInstanceForTests(null);
+ }
+ });
+ }
+
public void testRecoverRedundancyColocatedRegionsSimulation() {
recoverRedundancyColocatedRegions(true);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b35af269/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java
new file mode 100644
index 0000000..d3643da
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java
@@ -0,0 +1,145 @@
+package com.gemstone.gemfire.internal.cache.partitioned.rebalance;
+
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
+import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionRebalanceOp;
+import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator.Completion;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("*.UnitTest")
+@PrepareForTest({ InternalResourceManager.class, PartitionedRegionRebalanceOp.class })
+public class BucketOperatorImplTest {
+
+ private InternalResourceManager.ResourceObserver resourceObserver;
+
+ private BucketOperatorImpl operator;
+
+ @Mock
+ private PartitionedRegion region;
+ private boolean isRebalance = true;
+ private boolean replaceOfflineData = true;
+
+ private Map<String, Long> colocatedRegionBytes = new HashMap<String, Long>();
+ private int bucketId = 1;
+ private InternalDistributedMember sourceMember, targetMember;
+ @Mock
+ Completion completion;
+
+ @Before
+ public void setup() throws UnknownHostException {
+ resourceObserver = spy(new InternalResourceManager.ResourceObserverAdapter());
+
+ PowerMockito.mockStatic(InternalResourceManager.class);
+ when(InternalResourceManager.getResourceObserver()).thenReturn(resourceObserver);
+
+ operator = new BucketOperatorImpl(region, isRebalance, replaceOfflineData);
+
+ sourceMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
+ targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.2"), 1);
+ }
+
+ @After
+ public void after() {
+ reset(resourceObserver);
+ }
+
+ @Test
+ public void moveBucketShouldDelegateToParRegRebalanceOpMoveBucketForRegion() throws UnknownHostException {
+ PowerMockito.mockStatic(PartitionedRegionRebalanceOp.class);
+ when(PartitionedRegionRebalanceOp.moveBucketForRegion(sourceMember, targetMember, bucketId, region)).thenReturn(true);
+
+ operator.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
+
+ verify(resourceObserver, times(1)).movingBucket(region, bucketId, sourceMember, targetMember);
+
+ PowerMockito.verifyStatic(times(1));
+ PartitionedRegionRebalanceOp.moveBucketForRegion(sourceMember, targetMember, bucketId, region);
+ }
+
+ @Test
+ public void movePrimaryShouldDelegateToParRegRebalanceOpMovePrimaryBucketForRegion() throws UnknownHostException {
+ PowerMockito.mockStatic(PartitionedRegionRebalanceOp.class);
+ when(PartitionedRegionRebalanceOp.movePrimaryBucketForRegion(targetMember, bucketId, region, isRebalance)).thenReturn(true);
+
+ operator.movePrimary(sourceMember, targetMember, bucketId);
+
+ verify(resourceObserver, times(1)).movingPrimary(region, bucketId, sourceMember, targetMember);
+
+ PowerMockito.verifyStatic(times(1));
+ PartitionedRegionRebalanceOp.movePrimaryBucketForRegion(targetMember, bucketId, region, isRebalance);
+ }
+
+ @Test
+ public void createBucketShouldDelegateToParRegRebalanceOpCreateRedundantBucketForRegion() throws UnknownHostException {
+ PowerMockito.mockStatic(PartitionedRegionRebalanceOp.class);
+ when(PartitionedRegionRebalanceOp.createRedundantBucketForRegion(targetMember, bucketId, region, isRebalance, replaceOfflineData)).thenReturn(true);
+
+ operator.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completion);
+
+ PowerMockito.verifyStatic(times(1));
+ PartitionedRegionRebalanceOp.createRedundantBucketForRegion(targetMember, bucketId, region, isRebalance, replaceOfflineData);
+ }
+
+ @Test
+ public void createBucketShouldInvokeOnSuccessIfCreateBucketSucceeds() {
+ PowerMockito.mockStatic(PartitionedRegionRebalanceOp.class);
+ when(PartitionedRegionRebalanceOp.createRedundantBucketForRegion(targetMember, bucketId, region, isRebalance, replaceOfflineData)).thenReturn(true);
+
+ operator.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completion);
+
+ PowerMockito.verifyStatic(times(1));
+ PartitionedRegionRebalanceOp.createRedundantBucketForRegion(targetMember, bucketId, region, isRebalance, replaceOfflineData);
+
+ verify(completion, times(1)).onSuccess();
+ }
+
+ @Test
+ public void createBucketShouldInvokeOnFailureIfCreateBucketFails() {
+ PowerMockito.mockStatic(PartitionedRegionRebalanceOp.class);
+ when(PartitionedRegionRebalanceOp.createRedundantBucketForRegion(targetMember, bucketId, region, isRebalance, replaceOfflineData)).thenReturn(false); //return false for create fail
+
+ operator.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completion);
+
+ PowerMockito.verifyStatic(times(1));
+ PartitionedRegionRebalanceOp.createRedundantBucketForRegion(targetMember, bucketId, region, isRebalance, replaceOfflineData);
+
+ verify(completion, times(1)).onFailure();
+ }
+
+ @Test
+ public void removeBucketShouldDelegateToParRegRebalanceOpRemoveRedundantBucketForRegion() {
+ PowerMockito.mockStatic(PartitionedRegionRebalanceOp.class);
+ when(PartitionedRegionRebalanceOp.removeRedundantBucketForRegion(targetMember, bucketId, region)).thenReturn(true);
+
+ operator.removeBucket(targetMember, bucketId, colocatedRegionBytes);
+
+ PowerMockito.verifyStatic(times(1));
+ PartitionedRegionRebalanceOp.removeRedundantBucketForRegion(targetMember, bucketId, region);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b35af269/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
new file mode 100644
index 0000000..3678430
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
@@ -0,0 +1,97 @@
+package com.gemstone.gemfire.internal.cache.partitioned.rebalance;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.control.PartitionRebalanceDetailsImpl;
+import com.gemstone.gemfire.internal.cache.control.ResourceManagerStats;
+import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator.Completion;
+import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorImpl;
+import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorWrapper;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class BucketOperatorWrapperTest {
+
+ public BucketOperatorWrapper createBucketOperatorWrapper(BucketOperatorImpl delegate) {
+
+ ResourceManagerStats stats = mock(ResourceManagerStats.class);
+ doNothing().when(stats).startBucketCreate(anyInt());
+ doNothing().when(stats).endBucketCreate(anyInt(), anyBoolean(), anyLong(), anyLong());
+
+ Set<PartitionRebalanceDetailsImpl> rebalanceDetails = new HashSet<PartitionRebalanceDetailsImpl>();
+ PartitionedRegion region = mock(PartitionedRegion.class);
+
+ BucketOperatorWrapper wrapper = new BucketOperatorWrapper(delegate, rebalanceDetails, stats, region);
+
+ return wrapper;
+ }
+
+ @Test
+ public void bucketWrapperShouldInvokeOnFailureWhenCreateBucketFails() throws UnknownHostException {
+ BucketOperatorImpl delegate = mock(BucketOperatorImpl.class);
+ BucketOperatorWrapper wrapper = createBucketOperatorWrapper(delegate);
+
+ Map<String, Long> colocatedRegionBytes = new HashMap<String, Long>();
+ int bucketId = 1;
+ InternalDistributedMember targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
+
+ doAnswer(new Answer<Object>() {
+ public Object answer(InvocationOnMock invocation) {
+ //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
+ ((Completion) invocation.getArguments()[3]).onFailure();
+ return null;
+ }
+ }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
+
+ Completion completionSentToWrapper = mock(Completion.class);
+ wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
+
+ verify(completionSentToWrapper, times(1)).onFailure();
+ }
+
+ @Test
+ public void bucketWrapperShouldInvokeOnSuccessWhenCreateBucketFails() throws UnknownHostException {
+ BucketOperatorImpl delegate = mock(BucketOperatorImpl.class);
+ BucketOperatorWrapper wrapper = createBucketOperatorWrapper(delegate);
+
+ Map<String, Long> colocatedRegionBytes = new HashMap<String, Long>();
+ int bucketId = 1;
+ InternalDistributedMember targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
+
+ doAnswer(new Answer<Object>() {
+ public Object answer(InvocationOnMock invocation) {
+ //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
+ ((Completion) invocation.getArguments()[3]).onSuccess();
+ return null;
+ }
+ }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
+
+ Completion completionSentToWrapper = mock(Completion.class);
+ wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
+
+ verify(completionSentToWrapper, times(1)).onSuccess();
+ }
+}