You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/04/22 02:39:23 UTC
[43/50] [abbrv] incubator-geode git commit: Revert "GEODE-1153:
BucketOperatorWrapper fails to invoke completion callback."
Revert "GEODE-1153: BucketOperatorWrapper fails to invoke completion callback."
Reverting the fix for GEODE-1153 due to failures introduced
by this fix. Rebalancing operation hangs due to a concurrent
modification on a TreeSet which holds the bucket information
during the rebalance operation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7c1d867d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7c1d867d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7c1d867d
Branch: refs/heads/master
Commit: 7c1d867d82b0dc45954f1d996b5f327a73abcf1d
Parents: 616bc27
Author: Sai Boorlagadda <sb...@pivotal.io>
Authored: Mon Apr 11 10:25:52 2016 -0700
Committer: Sai Boorlagadda <sb...@pivotal.io>
Committed: Mon Apr 11 10:31:34 2016 -0700
----------------------------------------------------------------------
.../PartitionedRegionRebalanceOp.java | 322 ++++++++++++++++--
.../rebalance/BucketOperatorImpl.java | 78 -----
.../rebalance/BucketOperatorWrapper.java | 235 --------------
.../control/RebalanceOperationDUnitTest.java | 148 +--------
.../rebalance/BucketOperatorImplTest.java | 138 --------
.../rebalance/BucketOperatorWrapperTest.java | 323 -------------------
6 files changed, 296 insertions(+), 948 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c1d867d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
index 641d43d..8642876 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
@@ -47,8 +47,6 @@ import com.gemstone.gemfire.internal.cache.partitioned.BecomePrimaryBucketMessag
import com.gemstone.gemfire.internal.cache.partitioned.MoveBucketMessage.MoveBucketResponse;
import com.gemstone.gemfire.internal.cache.partitioned.RemoveBucketMessage.RemoveBucketResponse;
import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator;
-import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorImpl;
-import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorWrapper;
import com.gemstone.gemfire.internal.cache.partitioned.rebalance.ParallelBucketOperator;
import com.gemstone.gemfire.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel;
import com.gemstone.gemfire.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.AddressComparor;
@@ -415,9 +413,9 @@ public class PartitionedRegionRebalanceOp {
}
BucketOperator operator = simulate ?
new SimulatedBucketOperator()
- : new BucketOperatorImpl(this);
+ : new BucketOperatorImpl();
BucketOperatorWrapper wrapper = new BucketOperatorWrapper(
- operator, rebalanceDetails, stats, leaderRegion);
+ operator, rebalanceDetails);
return wrapper;
}
@@ -511,12 +509,17 @@ public class PartitionedRegionRebalanceOp {
* the member on which to create the redundant bucket
* @param bucketId
* the identifier of the bucket
+ * @param pr
+ * the partitioned region which contains the bucket
+ * @param forRebalance
+ * true if part of a rebalance operation
* @return true if the redundant bucket was created
*/
- public boolean createRedundantBucketForRegion(
- InternalDistributedMember target, int bucketId) {
- return getLeaderRegion().getRedundancyProvider().createBackupBucketOnMember(bucketId,
- target, isRebalance, replaceOfflineData,null, true);
+ public static boolean createRedundantBucketForRegion(
+ InternalDistributedMember target, int bucketId, PartitionedRegion pr,
+ boolean forRebalance, boolean replaceOfflineData) {
+ return pr.getRedundancyProvider().createBackupBucketOnMember(bucketId,
+ target, forRebalance, replaceOfflineData,null, true);
}
/**
@@ -526,18 +529,20 @@ public class PartitionedRegionRebalanceOp {
* the member on which to create the redundant bucket
* @param bucketId
* the identifier of the bucket
+ * @param pr
+ * the partitioned region which contains the bucket
* @return true if the redundant bucket was removed
*/
- public boolean removeRedundantBucketForRegion(
- InternalDistributedMember target, int bucketId) {
+ public static boolean removeRedundantBucketForRegion(
+ InternalDistributedMember target, int bucketId, PartitionedRegion pr) {
boolean removed = false;
- if (getLeaderRegion().getDistributionManager().getId().equals(target)) {
+ if (pr.getDistributionManager().getId().equals(target)) {
// invoke directly on local member...
- removed = getLeaderRegion().getDataStore().removeBucket(bucketId, false);
+ removed = pr.getDataStore().removeBucket(bucketId, false);
}
else {
// send message to remote member...
- RemoveBucketResponse response = RemoveBucketMessage.send(target, getLeaderRegion(),
+ RemoveBucketResponse response = RemoveBucketMessage.send(target, pr,
bucketId, false);
if (response != null) {
removed = response.waitForResponse();
@@ -553,23 +558,28 @@ public class PartitionedRegionRebalanceOp {
* the member which should be primary
* @param bucketId
* the identifier of the bucket
+ * @param pr
+ * the partitioned region which contains the bucket
+ * @param forRebalance
+ * true if part of a rebalance operation
* @return true if the move was successful
*/
- public boolean movePrimaryBucketForRegion(
- InternalDistributedMember target, int bucketId) {
+ public static boolean movePrimaryBucketForRegion(
+ InternalDistributedMember target, int bucketId, PartitionedRegion pr,
+ boolean forRebalance) {
boolean movedPrimary = false;
- if (getLeaderRegion().getDistributionManager().getId().equals(target)) {
+ if (pr.getDistributionManager().getId().equals(target)) {
// invoke directly on local member...
- BucketAdvisor bucketAdvisor = getLeaderRegion().getRegionAdvisor().getBucketAdvisor(
+ BucketAdvisor bucketAdvisor = pr.getRegionAdvisor().getBucketAdvisor(
bucketId);
if (bucketAdvisor.isHosting()) {
- movedPrimary = bucketAdvisor.becomePrimary(isRebalance);
+ movedPrimary = bucketAdvisor.becomePrimary(forRebalance);
}
}
else {
// send message to remote member...
BecomePrimaryBucketResponse response = BecomePrimaryBucketMessage.send(
- target, getLeaderRegion(), bucketId, isRebalance);
+ target, pr, bucketId, forRebalance);
if (response != null) {
movedPrimary = response.waitForResponse();
}
@@ -586,18 +596,20 @@ public class PartitionedRegionRebalanceOp {
* member which should receive the bucket
* @param bucketId
* the identifier of the bucket
+ * @param pr
+ * the partitioned region which contains the bucket
* @return true if the bucket was moved
*/
- public boolean moveBucketForRegion(InternalDistributedMember source,
- InternalDistributedMember target, int bucketId) {
+ public static boolean moveBucketForRegion(InternalDistributedMember source,
+ InternalDistributedMember target, int bucketId, PartitionedRegion pr) {
boolean movedBucket = false;
- if (getLeaderRegion().getDistributionManager().getId().equals(target)) {
+ if (pr.getDistributionManager().getId().equals(target)) {
// invoke directly on local member...
- movedBucket = getLeaderRegion().getDataStore().moveBucket(bucketId, source, false);
+ movedBucket = pr.getDataStore().moveBucket(bucketId, source, false);
}
else {
// send message to remote member...
- MoveBucketResponse response = MoveBucketMessage.send(target, getLeaderRegion(),
+ MoveBucketResponse response = MoveBucketMessage.send(target, pr,
bucketId, source);
if (response != null) {
movedBucket = response.waitForResponse();
@@ -614,10 +626,6 @@ public class PartitionedRegionRebalanceOp {
leaderRegion.getDataPolicy().withPersistence());
}
- public PartitionedRegion getLeaderRegion() {
- return leaderRegion;
- }
-
private class MembershipChangeListener implements MembershipListener {
public void memberDeparted(InternalDistributedMember id, boolean crashed) {
@@ -642,4 +650,262 @@ public class PartitionedRegionRebalanceOp {
public void quorumLost(Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {
}
}
+
+ private class BucketOperatorImpl implements BucketOperator {
+
+ @Override
+ public boolean moveBucket(InternalDistributedMember source,
+ InternalDistributedMember target, int bucketId,
+ Map<String, Long> colocatedRegionBytes) {
+
+ InternalResourceManager.getResourceObserver().movingBucket(
+ leaderRegion, bucketId, source, target);
+ return moveBucketForRegion(source, target, bucketId, leaderRegion);
+ }
+
+ @Override
+ public boolean movePrimary(InternalDistributedMember source,
+ InternalDistributedMember target, int bucketId) {
+
+ InternalResourceManager.getResourceObserver().movingPrimary(
+ leaderRegion, bucketId, source, target);
+ return movePrimaryBucketForRegion(target, bucketId, leaderRegion, isRebalance);
+ }
+
+ @Override
+ public void createRedundantBucket(
+ InternalDistributedMember targetMember, int bucketId,
+ Map<String, Long> colocatedRegionBytes, Completion completion) {
+ boolean result = false;
+ try {
+ result = createRedundantBucketForRegion(targetMember, bucketId,
+ leaderRegion, isRebalance,replaceOfflineData);
+ } finally {
+ if(result) {
+ completion.onSuccess();
+ } else {
+ completion.onFailure();
+ }
+ }
+ }
+
+ @Override
+ public void waitForOperations() {
+ //do nothing, all operations are synchronous
+ }
+
+ @Override
+ public boolean removeBucket(InternalDistributedMember targetMember, int bucketId,
+ Map<String, Long> colocatedRegionBytes) {
+ return removeRedundantBucketForRegion(targetMember, bucketId,
+ leaderRegion);
+ }
+ }
+
+ /**
+ * A wrapper class which delegates actual bucket operations to the enclosed BucketOperator,
+ * but keeps track of statistics about how many buckets are created, transfered, etc.
+ *
+ */
+ private class BucketOperatorWrapper implements
+ BucketOperator {
+ private final BucketOperator delegate;
+ private final Set<PartitionRebalanceDetailsImpl> detailSet;
+ private final int regionCount;
+
+ public BucketOperatorWrapper(
+ BucketOperator delegate,
+ Set<PartitionRebalanceDetailsImpl> rebalanceDetails) {
+ this.delegate = delegate;
+ this.detailSet = rebalanceDetails;
+ this.regionCount = detailSet.size();
+ }
+ @Override
+ public boolean moveBucket(InternalDistributedMember sourceMember,
+ InternalDistributedMember targetMember, int id,
+ Map<String, Long> colocatedRegionBytes) {
+ long start = System.nanoTime();
+ boolean result = false;
+ long elapsed = 0;
+ long totalBytes = 0;
+
+
+ if (stats != null) {
+ stats.startBucketTransfer(regionCount);
+ }
+ try {
+ result = delegate.moveBucket(sourceMember, targetMember, id,
+ colocatedRegionBytes);
+ elapsed = System.nanoTime() - start;
+ if (result) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebalancing {} bucket {} moved from {} to {}", leaderRegion, id, sourceMember, targetMember);
+ }
+ for (PartitionRebalanceDetailsImpl details : detailSet) {
+ String regionPath = details.getRegionPath();
+ Long regionBytes = colocatedRegionBytes.get(regionPath);
+ if(regionBytes != null) {
+ //only increment the elapsed time for the leader region
+ details.incTransfers(regionBytes.longValue(),
+ details.getRegion().equals(leaderRegion) ? elapsed : 0);
+ totalBytes += regionBytes.longValue();
+ }
+ }
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebalancing {} bucket {} moved failed from {} to {}", leaderRegion, id, sourceMember, targetMember);
+ }
+ }
+ } finally {
+ if(stats != null) {
+ stats.endBucketTransfer(regionCount, result, totalBytes, elapsed);
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public void createRedundantBucket(
+ final InternalDistributedMember targetMember, final int i,
+ final Map<String, Long> colocatedRegionBytes, final Completion completion) {
+
+ if(stats != null) {
+ stats.startBucketCreate(regionCount);
+ }
+
+ final long start = System.nanoTime();
+ delegate.createRedundantBucket(targetMember, i,
+ colocatedRegionBytes, new Completion() {
+
+ @Override
+ public void onSuccess() {
+ long totalBytes = 0;
+ long elapsed= System.nanoTime() - start;
+ if(logger.isDebugEnabled()) {
+ logger.debug("Rebalancing {} redundant bucket {} created on {}", leaderRegion, i, targetMember);
+ }
+ for (PartitionRebalanceDetailsImpl details : detailSet) {
+ String regionPath = details.getRegionPath();
+ Long lrb = colocatedRegionBytes.get(regionPath);
+ if (lrb != null) { // region could have gone away - esp during shutdow
+ long regionBytes = lrb.longValue();
+ //Only add the elapsed time to the leader region.
+ details.incCreates(regionBytes,
+ details.getRegion().equals(leaderRegion) ? elapsed : 0);
+ totalBytes += regionBytes;
+ }
+ }
+
+ if(stats != null) {
+ stats.endBucketCreate(regionCount, true, totalBytes, elapsed);
+ }
+
+ }
+
+ @Override
+ public void onFailure() {
+ long elapsed= System.nanoTime() - start;
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebalancing {} redundant bucket {} failed creation on {}", leaderRegion, i, targetMember);
+ }
+
+ if(stats != null) {
+ stats.endBucketCreate(regionCount, false, 0, elapsed);
+ }
+ }
+ });
+ }
+
+ @Override
+ public boolean removeBucket(
+ InternalDistributedMember targetMember, int i,
+ Map<String, Long> colocatedRegionBytes) {
+ boolean result = false;
+ long elapsed = 0;
+ long totalBytes = 0;
+
+
+ if(stats != null) {
+ stats.startBucketRemove(regionCount);
+ }
+ try {
+ long start = System.nanoTime();
+ result = delegate.removeBucket(targetMember, i,
+ colocatedRegionBytes);
+ elapsed= System.nanoTime() - start;
+ if (result) {
+ if(logger.isDebugEnabled()) {
+ logger.debug("Rebalancing {} redundant bucket {} removed from {}", leaderRegion, i, targetMember);
+ }
+ for (PartitionRebalanceDetailsImpl details : detailSet) {
+ String regionPath = details.getRegionPath();
+ Long lrb = colocatedRegionBytes.get(regionPath);
+ if (lrb != null) { // region could have gone away - esp during shutdow
+ long regionBytes = lrb.longValue();
+ //Only add the elapsed time to the leader region.
+ details.incRemoves(regionBytes,
+ details.getRegion().equals(leaderRegion) ? elapsed : 0);
+ totalBytes += regionBytes;
+ }
+ }
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebalancing {} redundant bucket {} failed removal o{}", leaderRegion, i, targetMember);
+ }
+ }
+ } finally {
+ if(stats != null) {
+ stats.endBucketRemove(regionCount, result, totalBytes, elapsed);
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public boolean movePrimary(InternalDistributedMember source,
+ InternalDistributedMember target, int bucketId) {
+ boolean result = false;
+ long elapsed = 0;
+
+ if(stats != null) {
+ stats.startPrimaryTransfer(regionCount);
+ }
+
+ try {
+ long start = System.nanoTime();
+ result = delegate.movePrimary(source, target, bucketId);
+ elapsed = System.nanoTime() - start;
+ if (result) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebalancing {} primary bucket {} moved from {} to {}", leaderRegion, bucketId, source, target);
+ }
+ for (PartitionRebalanceDetailsImpl details : detailSet) {
+ details.incPrimaryTransfers(details.getRegion().equals(leaderRegion) ? elapsed : 0);
+ }
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rebalancing {} primary bucket {} failed to move from {} to {}", leaderRegion, bucketId, source, target);
+ }
+ }
+ } finally {
+ if(stats != null) {
+ stats.endPrimaryTransfer(regionCount, result, elapsed);
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public void waitForOperations() {
+ delegate.waitForOperations();
+ }
+
+ public Set<PartitionRebalanceDetailsImpl> getDetailSet() {
+ return this.detailSet;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c1d867d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java
deleted file mode 100644
index 2f38752..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.partitioned.rebalance;
-
-import java.util.Map;
-
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
-import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionRebalanceOp;
-
-public class BucketOperatorImpl implements BucketOperator {
-
- private PartitionedRegionRebalanceOp rebalanceOp;
-
- public BucketOperatorImpl(PartitionedRegionRebalanceOp rebalanceOp) {
- this.rebalanceOp = rebalanceOp;
- }
-
- @Override
- public boolean moveBucket(InternalDistributedMember source,
- InternalDistributedMember target, int bucketId,
- Map<String, Long> colocatedRegionBytes) {
-
- InternalResourceManager.getResourceObserver().movingBucket(
- rebalanceOp.getLeaderRegion(), bucketId, source, target);
- return rebalanceOp.moveBucketForRegion(source, target, bucketId);
- }
-
- @Override
- public boolean movePrimary(InternalDistributedMember source,
- InternalDistributedMember target, int bucketId) {
-
- InternalResourceManager.getResourceObserver().movingPrimary(
- rebalanceOp.getLeaderRegion(), bucketId, source, target);
- return rebalanceOp.movePrimaryBucketForRegion(target, bucketId);
- }
-
- @Override
- public void createRedundantBucket(
- InternalDistributedMember targetMember, int bucketId,
- Map<String, Long> colocatedRegionBytes, Completion completion) {
- boolean result = false;
- try {
- result = rebalanceOp.createRedundantBucketForRegion(targetMember, bucketId);
- } finally {
- if(result) {
- completion.onSuccess();
- } else {
- completion.onFailure();
- }
- }
- }
-
- @Override
- public void waitForOperations() {
- //do nothing, all operations are synchronous
- }
-
- @Override
- public boolean removeBucket(InternalDistributedMember targetMember, int bucketId,
- Map<String, Long> colocatedRegionBytes) {
- return rebalanceOp.removeRedundantBucketForRegion(targetMember, bucketId);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c1d867d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
deleted file mode 100644
index d058a04..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.partitioned.rebalance;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.control.PartitionRebalanceDetailsImpl;
-import com.gemstone.gemfire.internal.cache.control.ResourceManagerStats;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-public class BucketOperatorWrapper implements BucketOperator {
- private static final Logger logger = LogService.getLogger();
-
- private final BucketOperator delegate;
- private final Set<PartitionRebalanceDetailsImpl> detailSet;
- private final int regionCount;
- private final ResourceManagerStats stats;
- private final PartitionedRegion leaderRegion;
-
- public BucketOperatorWrapper(BucketOperator delegate, Set<PartitionRebalanceDetailsImpl> rebalanceDetails,
- ResourceManagerStats stats, PartitionedRegion leaderRegion) {
- this.delegate = delegate;
- this.detailSet = rebalanceDetails;
- this.regionCount = detailSet.size();
- this.stats = stats;
- this.leaderRegion = leaderRegion;
- }
-
- @Override
- public boolean moveBucket(InternalDistributedMember sourceMember,
- InternalDistributedMember targetMember, int id,
- Map<String, Long> colocatedRegionBytes) {
- long start = System.nanoTime();
- boolean result = false;
- long elapsed = 0;
- long totalBytes = 0;
-
- if (stats != null) {
- stats.startBucketTransfer(regionCount);
- }
- try {
- result = delegate.moveBucket(sourceMember, targetMember, id, colocatedRegionBytes);
- elapsed = System.nanoTime() - start;
- if (result) {
- if (logger.isDebugEnabled()) {
- logger.debug("Rebalancing {} bucket {} moved from {} to {}", leaderRegion, id, sourceMember, targetMember);
- }
- for (PartitionRebalanceDetailsImpl details : detailSet) {
- String regionPath = details.getRegionPath();
- Long regionBytes = colocatedRegionBytes.get(regionPath);
- if (regionBytes != null) {
- // only increment the elapsed time for the leader region
- details.incTransfers(regionBytes.longValue(),
- details.getRegion().equals(leaderRegion) ? elapsed : 0);
- totalBytes += regionBytes.longValue();
- }
- }
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("Rebalancing {} bucket {} moved failed from {} to {}", leaderRegion, id, sourceMember, targetMember);
- }
- }
- } finally {
- if (stats != null) {
- stats.endBucketTransfer(regionCount, result, totalBytes, elapsed);
- }
- }
-
- return result;
- }
-
- @Override
- public void createRedundantBucket(
- final InternalDistributedMember targetMember, final int i,
- final Map<String, Long> colocatedRegionBytes, final Completion completion) {
-
- if (stats != null) {
- stats.startBucketCreate(regionCount);
- }
-
- final long start = System.nanoTime();
- delegate.createRedundantBucket(targetMember, i,
- colocatedRegionBytes, new Completion() {
-
- @Override
- public void onSuccess() {
- long totalBytes = 0;
- long elapsed = System.nanoTime() - start;
- if (logger.isDebugEnabled()) {
- logger.debug("Rebalancing {} redundant bucket {} created on {}", leaderRegion, i, targetMember);
- }
- for (PartitionRebalanceDetailsImpl details : detailSet) {
- String regionPath = details.getRegionPath();
- Long lrb = colocatedRegionBytes.get(regionPath);
- if (lrb != null) { // region could have gone away - esp during shutdow
- long regionBytes = lrb.longValue();
- // Only add the elapsed time to the leader region.
- details.incCreates(regionBytes, details.getRegion().equals(leaderRegion) ? elapsed : 0);
- totalBytes += regionBytes;
- }
- }
-
- if (stats != null) {
- stats.endBucketCreate(regionCount, true, totalBytes, elapsed);
- }
-
- //invoke onSuccess on the received completion callback
- completion.onSuccess();
- }
-
- @Override
- public void onFailure() {
- long elapsed = System.nanoTime() - start;
-
- if (logger.isDebugEnabled()) {
- logger.info("Rebalancing {} redundant bucket {} failed creation on {}", leaderRegion, i, targetMember);
- }
-
- if (stats != null) {
- stats.endBucketCreate(regionCount, false, 0, elapsed);
- }
-
- //invoke onFailure on the received completion callback
- completion.onFailure();
- }
- });
- }
-
- @Override
- public boolean removeBucket(
- InternalDistributedMember targetMember, int i,
- Map<String, Long> colocatedRegionBytes) {
- boolean result = false;
- long elapsed = 0;
- long totalBytes = 0;
-
- if (stats != null) {
- stats.startBucketRemove(regionCount);
- }
- try {
- long start = System.nanoTime();
- result = delegate.removeBucket(targetMember, i, colocatedRegionBytes);
- elapsed = System.nanoTime() - start;
- if (result) {
- if (logger.isDebugEnabled()) {
- logger.debug("Rebalancing {} redundant bucket {} removed from {}", leaderRegion, i, targetMember);
- }
- for (PartitionRebalanceDetailsImpl details : detailSet) {
- String regionPath = details.getRegionPath();
- Long lrb = colocatedRegionBytes.get(regionPath);
- if (lrb != null) { // region could have gone away - esp during shutdow
- long regionBytes = lrb.longValue();
- // Only add the elapsed time to the leader region.
- details.incRemoves(regionBytes,
- details.getRegion().equals(leaderRegion) ? elapsed : 0);
- totalBytes += regionBytes;
- }
- }
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("Rebalancing {} redundant bucket {} failed removal o{}", leaderRegion, i, targetMember);
- }
- }
- } finally {
- if (stats != null) {
- stats.endBucketRemove(regionCount, result, totalBytes, elapsed);
- }
- }
-
- return result;
- }
-
- @Override
- public boolean movePrimary(InternalDistributedMember source,
- InternalDistributedMember target, int bucketId) {
- boolean result = false;
- long elapsed = 0;
-
- if (stats != null) {
- stats.startPrimaryTransfer(regionCount);
- }
-
- try {
- long start = System.nanoTime();
- result = delegate.movePrimary(source, target, bucketId);
- elapsed = System.nanoTime() - start;
- if (result) {
- if (logger.isDebugEnabled()) {
- logger.debug("Rebalancing {} primary bucket {} moved from {} to {}", leaderRegion, bucketId, source, target);
- }
- for (PartitionRebalanceDetailsImpl details : detailSet) {
- details.incPrimaryTransfers(details.getRegion().equals(leaderRegion) ? elapsed : 0);
- }
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("Rebalancing {} primary bucket {} failed to move from {} to {}", leaderRegion, bucketId, source, target);
- }
- }
- } finally {
- if (stats != null) {
- stats.endPrimaryTransfer(regionCount, result, elapsed);
- }
- }
-
- return result;
- }
-
- @Override
- public void waitForOperations() {
- delegate.waitForOperations();
- }
-
- public Set<PartitionRebalanceDetailsImpl> getDetailSet() {
- return this.detailSet;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c1d867d/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
index a5d2a03..26ebc16 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
@@ -16,13 +16,6 @@
*/
package com.gemstone.gemfire.internal.cache.control;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -50,6 +43,7 @@ import com.gemstone.gemfire.cache.DiskStoreFactory;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.EvictionAction;
import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.GemFireCache;
import com.gemstone.gemfire.cache.LoaderHelper;
import com.gemstone.gemfire.cache.PartitionAttributes;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
@@ -69,12 +63,11 @@ import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.cache.BucketRegion;
import com.gemstone.gemfire.internal.cache.ColocationHelper;
import com.gemstone.gemfire.internal.cache.DiskStoreImpl;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.PRHARedundancyProvider;
+import com.gemstone.gemfire.internal.cache.InternalCache;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceObserverAdapter;
@@ -979,143 +972,6 @@ public class RebalanceOperationDUnitTest extends CacheTestCase {
return (DistributedMember) vm.invoke(createPrRegion);
}
- public void testRecoverRedundancyBalancingIfCreateBucketFails() {
- Host host = Host.getHost(0);
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
- VM vm2 = host.getVM(2);
-
-
- final DistributedMember member1 = createPrRegion(vm0, "region1", 100, null);
-
- vm0.invoke(new SerializableRunnable("createSomeBuckets") {
-
- public void run() {
- Cache cache = getCache();
- Region region = cache.getRegion("region1");
- for(int i = 0; i < 1; i++) {
- region.put(Integer.valueOf(i), "A");
- }
- }
- });
-
-
- SerializableRunnable checkRedundancy= new SerializableRunnable("checkRedundancy") {
-
- public void run() {
- Cache cache = getCache();
- Region region = cache.getRegion("region1");
- PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
- assertEquals(1, details.getCreatedBucketCount());
- assertEquals(0,details.getActualRedundantCopies());
- assertEquals(1,details.getLowRedundancyBucketCount());
- }
- };
-
- vm0.invoke(checkRedundancy);
-
- //Now create the region in 2 more VMs
- //Let localMaxMemory(VM1) > localMaxMemory(VM2)
- //so that redundant bucket will always be attempted on VM1
- final DistributedMember member2 = createPrRegion(vm1, "region1", 100, null);
- final DistributedMember member3 = createPrRegion(vm2, "region1", 90, null);
-
- vm0.invoke(checkRedundancy);
-
- //Inject mock PRHARedundancyProvider to simulate createBucketFailures
- vm0.invoke(new SerializableRunnable("injectCreateBucketFailureAndRebalance") {
-
- @Override
- public void run() {
- GemFireCacheImpl cache = spy(getGemfireCache());
- //set the spied cache instance
- GemFireCacheImpl origCache = GemFireCacheImpl.setInstanceForTests(cache);
-
- PartitionedRegion origRegion = (PartitionedRegion) cache.getRegion("region1");
- PartitionedRegion spyRegion = spy(origRegion);
- PRHARedundancyProvider redundancyProvider = spy(new PRHARedundancyProvider(spyRegion));
-
- //return the spied region when ever getPartitionedRegions() is invoked
- Set<PartitionedRegion> parRegions = cache.getPartitionedRegions();
- parRegions.remove(origRegion);
- parRegions.add(spyRegion);
-
- doReturn(parRegions).when(cache).getPartitionedRegions();
- doReturn(redundancyProvider).when(spyRegion).getRedundancyProvider();
-
- //simulate create bucket fails on member2 and test if it creates on member3
- doReturn(false).when(redundancyProvider).createBackupBucketOnMember(anyInt(), eq((InternalDistributedMember) member2), anyBoolean(), anyBoolean(), any(), anyBoolean());
-
- //Now simulate a rebalance
- //Create operationImpl and not factory as we need spied cache to be passed to operationImpl
- RegionFilter filter = new FilterByPath(null, null);
- RebalanceOperationImpl operation = new RebalanceOperationImpl(cache, false, filter);
- operation.start();
- RebalanceResults results = null;
- try {
- results = operation.getResults(MAX_WAIT, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- Assert.fail("Interrupted waiting on rebalance", e);
- } catch (TimeoutException e) {
- Assert.fail("Timeout waiting on rebalance", e);
- }
- assertEquals(1, results.getTotalBucketCreatesCompleted());
- assertEquals(0, results.getTotalPrimaryTransfersCompleted());
- assertEquals(0, results.getTotalBucketTransferBytes());
- assertEquals(0, results.getTotalBucketTransfersCompleted());
- Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
- assertEquals(1, detailSet.size());
- PartitionRebalanceInfo details = detailSet.iterator().next();
- assertEquals(1, details.getBucketCreatesCompleted());
- assertEquals(0, details.getPrimaryTransfersCompleted());
- assertEquals(0, details.getBucketTransferBytes());
- assertEquals(0, details.getBucketTransfersCompleted());
-
- Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
- assertEquals(3, afterDetails.size());
- for(PartitionMemberInfo memberDetails: afterDetails) {
- if(memberDetails.getDistributedMember().equals(member1)) {
- assertEquals(1, memberDetails.getBucketCount());
- assertEquals(1, memberDetails.getPrimaryCount());
- } else if(memberDetails.getDistributedMember().equals(member2)) {
- assertEquals(0, memberDetails.getBucketCount());
- assertEquals(0, memberDetails.getPrimaryCount());
- } else if(memberDetails.getDistributedMember().equals(member3)) {
- assertEquals(1, memberDetails.getBucketCount());
- assertEquals(0, memberDetails.getPrimaryCount());
- }
- }
-
- ResourceManagerStats stats = cache.getResourceManager().getStats();
-
- assertEquals(0, stats.getRebalancesInProgress());
- assertEquals(1, stats.getRebalancesCompleted());
- assertEquals(0, stats.getRebalanceBucketCreatesInProgress());
- assertEquals(results.getTotalBucketCreatesCompleted(), stats.getRebalanceBucketCreatesCompleted());
- assertEquals(1, stats.getRebalanceBucketCreatesFailed());
-
- //set the original cache
- GemFireCacheImpl.setInstanceForTests(origCache);
- }
- });
-
- SerializableRunnable checkRedundancyFixed = new SerializableRunnable("checkLowRedundancy") {
-
- public void run() {
- Cache cache = getCache();
- Region region = cache.getRegion("region1");
- PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
- assertEquals(1, details.getCreatedBucketCount());
- assertEquals(1,details.getActualRedundantCopies());
- assertEquals(0,details.getLowRedundancyBucketCount());
- }
- };
-
- vm0.invoke(checkRedundancyFixed);
- vm1.invoke(checkRedundancyFixed);
- vm2.invoke(checkRedundancyFixed);
- }
-
public void testRecoverRedundancyColocatedRegionsSimulation() {
recoverRedundancyColocatedRegions(true);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c1d867d/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java
deleted file mode 100644
index a5c8982..0000000
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.partitioned.rebalance;
-
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
-import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionRebalanceOp;
-import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator.Completion;
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class BucketOperatorImplTest {
-
- private InternalResourceManager.ResourceObserver resourceObserver;
-
- private BucketOperatorImpl operator;
-
- private PartitionedRegion region;
- private PartitionedRegionRebalanceOp rebalanceOp;
- private Completion completion;
-
- private Map<String, Long> colocatedRegionBytes = new HashMap<String, Long>();
- private int bucketId = 1;
- private InternalDistributedMember sourceMember, targetMember;
-
- @Before
- public void setup() throws UnknownHostException {
- region = mock(PartitionedRegion.class);
- rebalanceOp = mock(PartitionedRegionRebalanceOp.class);
- completion = mock(Completion.class);
-
- resourceObserver = spy(new InternalResourceManager.ResourceObserverAdapter());
- InternalResourceManager.setResourceObserver(resourceObserver);
-
- doReturn(region).when(rebalanceOp).getLeaderRegion();
-
- operator = new BucketOperatorImpl(rebalanceOp);
-
- sourceMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
- targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.2"), 1);
- }
-
- @After
- public void after() {
- reset(resourceObserver);
- }
-
- @Test
- public void moveBucketShouldDelegateToParRegRebalanceOpMoveBucketForRegion() throws UnknownHostException {
- doReturn(true).when(rebalanceOp).moveBucketForRegion(sourceMember, targetMember, bucketId);
-
- operator.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
-
- verify(resourceObserver, times(1)).movingBucket(region, bucketId, sourceMember, targetMember);
- verify(rebalanceOp, times(1)).moveBucketForRegion(sourceMember, targetMember, bucketId);
- }
-
- @Test
- public void movePrimaryShouldDelegateToParRegRebalanceOpMovePrimaryBucketForRegion() throws UnknownHostException {
- doReturn(true).when(rebalanceOp).movePrimaryBucketForRegion(targetMember, bucketId);
-
- operator.movePrimary(sourceMember, targetMember, bucketId);
-
- verify(resourceObserver, times(1)).movingPrimary(region, bucketId, sourceMember, targetMember);
- verify(rebalanceOp, times(1)).movePrimaryBucketForRegion(targetMember, bucketId);
- }
-
- @Test
- public void createBucketShouldDelegateToParRegRebalanceOpCreateRedundantBucketForRegion() throws UnknownHostException {
- doReturn(true).when(rebalanceOp).createRedundantBucketForRegion(targetMember, bucketId);
-
- operator.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completion);
-
- verify(rebalanceOp, times(1)).createRedundantBucketForRegion(targetMember, bucketId);
- }
-
- @Test
- public void createBucketShouldInvokeOnSuccessIfCreateBucketSucceeds() {
- doReturn(true).when(rebalanceOp).createRedundantBucketForRegion(targetMember, bucketId);
-
- operator.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completion);
-
- verify(rebalanceOp, times(1)).createRedundantBucketForRegion(targetMember, bucketId);
- verify(completion, times(1)).onSuccess();
- }
-
- @Test
- public void createBucketShouldInvokeOnFailureIfCreateBucketFails() {
- doReturn(false).when(rebalanceOp).createRedundantBucketForRegion(targetMember, bucketId); //return false for create fail
-
- operator.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completion);
-
- verify(rebalanceOp, times(1)).createRedundantBucketForRegion(targetMember, bucketId);
- verify(completion, times(1)).onFailure();
- }
-
- @Test
- public void removeBucketShouldDelegateToParRegRebalanceOpRemoveRedundantBucketForRegion() {
- doReturn(true).when(rebalanceOp).removeRedundantBucketForRegion(targetMember, bucketId);
-
- operator.removeBucket(targetMember, bucketId, colocatedRegionBytes);
-
- verify(rebalanceOp, times(1)).removeRedundantBucketForRegion(targetMember, bucketId);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c1d867d/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
deleted file mode 100644
index 558062b..0000000
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.partitioned.rebalance;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.spy;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.control.PartitionRebalanceDetailsImpl;
-import com.gemstone.gemfire.internal.cache.control.ResourceManagerStats;
-import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator.Completion;
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class BucketOperatorWrapperTest {
-
- private ResourceManagerStats stats;
- private PartitionedRegion leaderRegion;
- private PartitionedRegion colocatedRegion;
- private Set<PartitionRebalanceDetailsImpl> rebalanceDetails;
- private BucketOperatorWrapper wrapper;
- private BucketOperatorImpl delegate;
-
- private Map<String, Long> colocatedRegionBytes;
- private int bucketId = 1;
- private InternalDistributedMember sourceMember, targetMember;
-
- private final static String PR_LEADER_REGION_NAME = "leadregion1";
- private final static String PR_COLOCATED_REGION_NAME = "coloregion1";
-
- @Before
- public void setUp() throws UnknownHostException {
- colocatedRegionBytes = new HashMap<String, Long>();
- colocatedRegionBytes.put(PR_LEADER_REGION_NAME, 100L);
- colocatedRegionBytes.put(PR_COLOCATED_REGION_NAME, 50L);
-
- sourceMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
- targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.2"), 1);
-
- stats = mock(ResourceManagerStats.class);
- doNothing().when(stats).startBucketCreate(anyInt());
- doNothing().when(stats).endBucketCreate(anyInt(), anyBoolean(), anyLong(), anyLong());
-
- leaderRegion = mock(PartitionedRegion.class);
- doReturn(PR_LEADER_REGION_NAME).when(leaderRegion).getFullPath();
- colocatedRegion = mock(PartitionedRegion.class);
- doReturn(PR_COLOCATED_REGION_NAME).when(colocatedRegion).getFullPath();
-
- rebalanceDetails = new HashSet<PartitionRebalanceDetailsImpl>();
- PartitionRebalanceDetailsImpl details = spy(new PartitionRebalanceDetailsImpl(leaderRegion));
- rebalanceDetails.add(details);
-
- delegate = mock(BucketOperatorImpl.class);
-
- wrapper = new BucketOperatorWrapper(delegate, rebalanceDetails, stats, leaderRegion);
- }
-
- @Test
- public void bucketWrapperShouldDelegateCreateBucketToEnclosedOperator() {
- Completion completionSentToWrapper = mock(Completion.class);
-
- doNothing().when(delegate).createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
-
- wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
-
- verify(delegate, times(1)).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
- }
-
- @Test
- public void bucketWrapperShouldRecordNumberOfBucketsCreatedIfCreateBucketSucceeds() {
- doAnswer(new Answer<Object>() {
- public Object answer(InvocationOnMock invocation) {
- //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
- ((Completion) invocation.getArguments()[3]).onSuccess();
- return null;
- }
- }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
-
- Completion completionSentToWrapper = mock(Completion.class);
- wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
-
- //verify create buckets is recorded
- for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
- if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
- verify(details, times(1)).incCreates(eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME)), anyLong());
- else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
- verify(details, times(1)).incTransfers(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME), 0); //elapsed is recorded only if its leader
- }
- }
-
- @Test
- public void bucketWrapperShouldNotRecordNumberOfBucketsCreatedIfCreateBucketFails() {
- doAnswer(new Answer<Object>() {
- public Object answer(InvocationOnMock invocation) {
- //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
- ((Completion) invocation.getArguments()[3]).onFailure();
- return null;
- }
- }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
-
- Completion completionSentToWrapper = mock(Completion.class);
- wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
-
- //verify create buckets is not recorded
- for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
- verify(details, times(0)).incTransfers(anyLong(), anyLong());
- }
- }
-
- @Test
- public void bucketWrapperShouldInvokeOnFailureWhenCreateBucketFails() {
- doAnswer(new Answer<Object>() {
- public Object answer(InvocationOnMock invocation) {
- //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
- ((Completion) invocation.getArguments()[3]).onFailure();
- return null;
- }
- }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
-
- Completion completionSentToWrapper = mock(Completion.class);
- wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
-
- //verify onFailure is invoked
- verify(completionSentToWrapper, times(1)).onFailure();
- }
-
- @Test
- public void bucketWrapperShouldInvokeOnSuccessWhenCreateBucketSucceeds() {
- doAnswer(new Answer<Object>() {
- public Object answer(InvocationOnMock invocation) {
- //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
- ((Completion) invocation.getArguments()[3]).onSuccess();
- return null;
- }
- }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
-
- Completion completionSentToWrapper = mock(Completion.class);
- wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
-
- verify(completionSentToWrapper, times(1)).onSuccess();
- }
-
- @Test
- public void bucketWrapperShouldDelegateMoveBucketToEnclosedOperator() {
- doReturn(true).when(delegate).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
-
- wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
-
- //verify the delegate is invoked
- verify(delegate, times(1)).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
-
- //verify we recorded necessary stats
- verify(stats, times(1)).startBucketTransfer(anyInt());
- verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong());
- }
-
- @Test
- public void bucketWrapperShouldRecordBytesTransferredPerRegionAfterMoveBucketIsSuccessful() {
- doReturn(true).when(delegate).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
-
- wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
-
- //verify the details is updated with bytes transfered
- for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
- if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
- verify(details, times(1)).incTransfers(eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME)), anyLong());
- else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
- verify(details, times(1)).incTransfers(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME), 0); //elapsed is recorded only if its leader
- }
-
- //verify we recorded necessary stats
- verify(stats, times(1)).startBucketTransfer(anyInt());
- verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong());
- }
-
- @Test
- public void bucketWrapperShouldDoNotRecordBytesTransferedIfMoveBucketFails() {
- doReturn(false).when(delegate).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
-
- wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
-
- //verify the details is not updated with bytes transfered
- for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
- verify(details, times(0)).incTransfers(anyLong(), anyLong());
- }
-
- //verify we recorded necessary stats
- verify(stats, times(1)).startBucketTransfer(anyInt());
- verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong());
- }
-
- @Test
- public void bucketWrapperShouldDelegateRemoveBucketToEnclosedOperator() {
- wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes);
-
- //verify the delegate is invoked
- verify(delegate, times(1)).removeBucket(targetMember, bucketId, colocatedRegionBytes);
-
- //verify we recorded necessary stats
- verify(stats, times(1)).startBucketRemove(anyInt());
- verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong());
- }
-
- @Test
- public void bucketWrapperShouldRecordBucketRemovesPerRegionAfterRemoveBucketIsSuccessful() {
- doReturn(true).when(delegate).removeBucket(targetMember, bucketId, colocatedRegionBytes);
-
- wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes);
-
- //verify the details is updated with bytes transfered
- for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
- if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
- verify(details, times(1)).incRemoves((eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME))), anyLong());
- else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
- verify(details, times(1)).incRemoves(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME), 0); //elapsed is recorded only if its leader
- }
-
- //verify we recorded necessary stats
- verify(stats, times(1)).startBucketRemove(anyInt());
- verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong());
- }
-
- @Test
- public void bucketWrapperShouldDoNotRecordBucketRemovesIfMoveBucketFails() {
- doReturn(false).when(delegate).removeBucket(targetMember, bucketId, colocatedRegionBytes);
-
- wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes);
-
- //verify the details is not updated with bytes transfered
- for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
- verify(details, times(0)).incTransfers(anyLong(), anyLong());
- }
-
- //verify we recorded necessary stats
- verify(stats, times(1)).startBucketRemove(anyInt());
- verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong());
- }
-
- @Test
- public void bucketWrapperShouldDelegateMovePrimaryToEnclosedOperator() {
- wrapper.movePrimary(sourceMember, targetMember, bucketId);
-
- //verify the delegate is invoked
- verify(delegate, times(1)).movePrimary(sourceMember, targetMember, bucketId);
-
- //verify we recorded necessary stats
- verify(stats, times(1)).startPrimaryTransfer(anyInt());
- verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong());
- }
-
- @Test
- public void bucketWrapperShouldRecordPrimaryTransfersPerRegionAfterMovePrimaryIsSuccessful() {
- doReturn(true).when(delegate).movePrimary(sourceMember, targetMember, bucketId);
-
- wrapper.movePrimary(sourceMember, targetMember, bucketId);
-
- //verify the details is updated with bytes transfered
- for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
- if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
- verify(details, times(1)).incPrimaryTransfers(anyLong());
- else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
- verify(details, times(1)).incPrimaryTransfers(0); //elapsed is recorded only if its leader
- }
-
- //verify we recorded necessary stats
- verify(stats, times(1)).startPrimaryTransfer(anyInt());
- verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong());
- }
-
- @Test
- public void bucketWrapperShouldNotRecordPrimaryTransfersPerRegionAfterMovePrimaryFails() {
- doReturn(false).when(delegate).movePrimary(sourceMember, targetMember, bucketId);
-
- wrapper.movePrimary(sourceMember, targetMember, bucketId);
-
- //verify the details is not updated with bytes transfered
- for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
- verify(details, times(0)).incTransfers(anyLong(), anyLong());
- }
-
- //verify we recorded necessary stats
- verify(stats, times(1)).startPrimaryTransfer(anyInt());
- verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong());
- }
-}