You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by sa...@apache.org on 2016/04/04 21:11:19 UTC
incubator-geode git commit: Added tests for BucketOperatorWrapper
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-1153 41312881c -> c964c27ac
Added tests for BucketOperatorWrapper
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/c964c27a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/c964c27a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/c964c27a
Branch: refs/heads/feature/GEODE-1153
Commit: c964c27acc5b230b4a43da122e64b96c7d942a9a
Parents: 4131288
Author: Sai Boorlagadda <sb...@pivotal.io>
Authored: Mon Apr 4 12:08:12 2016 -0700
Committer: Sai Boorlagadda <sb...@pivotal.io>
Committed: Mon Apr 4 12:08:12 2016 -0700
----------------------------------------------------------------------
.../rebalance/BucketOperatorWrapper.java | 32 ++-
.../control/RebalanceOperationDUnitTest.java | 6 +-
.../rebalance/BucketOperatorWrapperTest.java | 258 +++++++++++++++++--
3 files changed, 259 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c964c27a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
index 0d0edd2..c5ae30d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
@@ -20,7 +20,8 @@ public class BucketOperatorWrapper implements BucketOperator {
private final ResourceManagerStats stats;
private final PartitionedRegion leaderRegion;
- public BucketOperatorWrapper(BucketOperator delegate, Set<PartitionRebalanceDetailsImpl> rebalanceDetails, ResourceManagerStats stats, PartitionedRegion leaderRegion) {
+ public BucketOperatorWrapper(BucketOperator delegate, Set<PartitionRebalanceDetailsImpl> rebalanceDetails,
+ ResourceManagerStats stats, PartitionedRegion leaderRegion) {
this.delegate = delegate;
this.detailSet = rebalanceDetails;
this.regionCount = detailSet.size();
@@ -29,7 +30,9 @@ public class BucketOperatorWrapper implements BucketOperator {
}
@Override
- public boolean moveBucket(InternalDistributedMember sourceMember, InternalDistributedMember targetMember, int id, Map<String, Long> colocatedRegionBytes) {
+ public boolean moveBucket(InternalDistributedMember sourceMember,
+ InternalDistributedMember targetMember, int id,
+ Map<String, Long> colocatedRegionBytes) {
long start = System.nanoTime();
boolean result = false;
long elapsed = 0;
@@ -50,7 +53,8 @@ public class BucketOperatorWrapper implements BucketOperator {
Long regionBytes = colocatedRegionBytes.get(regionPath);
if (regionBytes != null) {
// only increment the elapsed time for the leader region
- details.incTransfers(regionBytes.longValue(), details.getRegion().equals(leaderRegion) ? elapsed : 0);
+ details.incTransfers(regionBytes.longValue(),
+ details.getRegion().equals(leaderRegion) ? elapsed : 0);
totalBytes += regionBytes.longValue();
}
}
@@ -69,15 +73,17 @@ public class BucketOperatorWrapper implements BucketOperator {
}
@Override
- public void createRedundantBucket(final InternalDistributedMember targetMember, final int i, final Map<String, Long> colocatedRegionBytes,
- final Completion completion) {
+ public void createRedundantBucket(
+ final InternalDistributedMember targetMember, final int i,
+ final Map<String, Long> colocatedRegionBytes, final Completion completion) {
if (stats != null) {
stats.startBucketCreate(regionCount);
}
final long start = System.nanoTime();
- delegate.createRedundantBucket(targetMember, i, colocatedRegionBytes, new Completion() {
+ delegate.createRedundantBucket(targetMember, i,
+ colocatedRegionBytes, new Completion() {
@Override
public void onSuccess() {
@@ -109,9 +115,9 @@ public class BucketOperatorWrapper implements BucketOperator {
public void onFailure() {
long elapsed = System.nanoTime() - start;
- //if (logger.isDebugEnabled()) {
+ if (logger.isDebugEnabled()) {
logger.info("Rebalancing {} redundant bucket {} failed creation on {}", leaderRegion, i, targetMember);
- //}
+ }
if (stats != null) {
stats.endBucketCreate(regionCount, false, 0, elapsed);
@@ -124,7 +130,9 @@ public class BucketOperatorWrapper implements BucketOperator {
}
@Override
- public boolean removeBucket(InternalDistributedMember targetMember, int i, Map<String, Long> colocatedRegionBytes) {
+ public boolean removeBucket(
+ InternalDistributedMember targetMember, int i,
+ Map<String, Long> colocatedRegionBytes) {
boolean result = false;
long elapsed = 0;
long totalBytes = 0;
@@ -146,7 +154,8 @@ public class BucketOperatorWrapper implements BucketOperator {
if (lrb != null) { // region could have gone away - esp during shutdow
long regionBytes = lrb.longValue();
// Only add the elapsed time to the leader region.
- details.incRemoves(regionBytes, details.getRegion().equals(leaderRegion) ? elapsed : 0);
+ details.incRemoves(regionBytes,
+ details.getRegion().equals(leaderRegion) ? elapsed : 0);
totalBytes += regionBytes;
}
}
@@ -165,7 +174,8 @@ public class BucketOperatorWrapper implements BucketOperator {
}
@Override
- public boolean movePrimary(InternalDistributedMember source, InternalDistributedMember target, int bucketId) {
+ public boolean movePrimary(InternalDistributedMember source,
+ InternalDistributedMember target, int bucketId) {
boolean result = false;
long elapsed = 0;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c964c27a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
index 0dfbcb9..a5d2a03 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
@@ -1015,13 +1015,15 @@ public class RebalanceOperationDUnitTest extends CacheTestCase {
vm0.invoke(checkRedundancy);
//Now create the region in 2 more VMs
+ //Let localMaxMemory(VM1) > localMaxMemory(VM2)
+ //so that redundant bucket will always be attempted on VM1
final DistributedMember member2 = createPrRegion(vm1, "region1", 100, null);
- final DistributedMember member3 = createPrRegion(vm2, "region1", 100, null);
+ final DistributedMember member3 = createPrRegion(vm2, "region1", 90, null);
vm0.invoke(checkRedundancy);
//Inject mock PRHARedundancyProvider to simulate createBucketFailures
- vm0.invoke(new SerializableRunnable("inject failure") {
+ vm0.invoke(new SerializableRunnable("injectCreateBucketFailureAndRebalance") {
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c964c27a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
index 3678430..6b1551b 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
@@ -7,9 +7,11 @@ import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.spy;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -18,6 +20,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
@@ -28,36 +31,87 @@ import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.control.PartitionRebalanceDetailsImpl;
import com.gemstone.gemfire.internal.cache.control.ResourceManagerStats;
import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator.Completion;
-import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorImpl;
-import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorWrapper;
import com.gemstone.gemfire.test.junit.categories.UnitTest;
@Category(UnitTest.class)
public class BucketOperatorWrapperTest {
- public BucketOperatorWrapper createBucketOperatorWrapper(BucketOperatorImpl delegate) {
+ private ResourceManagerStats stats;
+ private PartitionedRegion leaderRegion;
+ private PartitionedRegion colocatedRegion;
+ private Set<PartitionRebalanceDetailsImpl> rebalanceDetails;
+ private BucketOperatorWrapper wrapper;
+ private BucketOperatorImpl delegate;
+
+ private Map<String, Long> colocatedRegionBytes;
+ private int bucketId = 1;
+ private InternalDistributedMember sourceMember, targetMember;
+
+ private final static String PR_LEADER_REGION_NAME = "leadregion1";
+ private final static String PR_COLOCATED_REGION_NAME = "coloregion1";
+
+ @Before
+ public void setUp() throws UnknownHostException {
+ colocatedRegionBytes = new HashMap<String, Long>();
+ colocatedRegionBytes.put(PR_LEADER_REGION_NAME, 100L);
+ colocatedRegionBytes.put(PR_COLOCATED_REGION_NAME, 50L);
- ResourceManagerStats stats = mock(ResourceManagerStats.class);
+ sourceMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
+ targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.2"), 1);
+
+ stats = mock(ResourceManagerStats.class);
doNothing().when(stats).startBucketCreate(anyInt());
doNothing().when(stats).endBucketCreate(anyInt(), anyBoolean(), anyLong(), anyLong());
- Set<PartitionRebalanceDetailsImpl> rebalanceDetails = new HashSet<PartitionRebalanceDetailsImpl>();
- PartitionedRegion region = mock(PartitionedRegion.class);
+ leaderRegion = mock(PartitionedRegion.class);
+ doReturn(PR_LEADER_REGION_NAME).when(leaderRegion).getFullPath();
+ colocatedRegion = mock(PartitionedRegion.class);
+ doReturn(PR_COLOCATED_REGION_NAME).when(colocatedRegion).getFullPath();
+
+ rebalanceDetails = new HashSet<PartitionRebalanceDetailsImpl>();
+ PartitionRebalanceDetailsImpl details = spy(new PartitionRebalanceDetailsImpl(leaderRegion));
+ rebalanceDetails.add(details);
+
+ delegate = mock(BucketOperatorImpl.class);
- BucketOperatorWrapper wrapper = new BucketOperatorWrapper(delegate, rebalanceDetails, stats, region);
+ wrapper = new BucketOperatorWrapper(delegate, rebalanceDetails, stats, leaderRegion);
+ }
+
+ @Test
+ public void bucketWrapperShouldDelegateCreateBucketToEnclosedOperator() {
+ Completion completionSentToWrapper = mock(Completion.class);
+
+ doNothing().when(delegate).createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
+
+ wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
- return wrapper;
+ verify(delegate, times(1)).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
}
@Test
- public void bucketWrapperShouldInvokeOnFailureWhenCreateBucketFails() throws UnknownHostException {
- BucketOperatorImpl delegate = mock(BucketOperatorImpl.class);
- BucketOperatorWrapper wrapper = createBucketOperatorWrapper(delegate);
-
- Map<String, Long> colocatedRegionBytes = new HashMap<String, Long>();
- int bucketId = 1;
- InternalDistributedMember targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
-
+ public void bucketWrapperShouldRecordNumberOfBucketsCreatedIfCreateBucketSucceeds() {
+ doAnswer(new Answer<Object>() {
+ public Object answer(InvocationOnMock invocation) {
+ //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
+ ((Completion) invocation.getArguments()[3]).onSuccess();
+ return null;
+ }
+ }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
+
+ Completion completionSentToWrapper = mock(Completion.class);
+ wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
+
+ //verify create buckets is recorded
+ for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
+ if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
+ verify(details, times(1)).incCreates(eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME)), anyLong());
+ else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
+ verify(details, times(1)).incTransfers(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME), 0); //elapsed is recorded only if its leader
+ }
+ }
+
+ @Test
+ public void bucketWrapperShouldNotRecordNumberOfBucketsCreatedIfCreateBucketFails() {
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
//3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
@@ -69,18 +123,31 @@ public class BucketOperatorWrapperTest {
Completion completionSentToWrapper = mock(Completion.class);
wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
+ //verify create buckets is not recorded
+ for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
+ verify(details, times(0)).incTransfers(anyLong(), anyLong());
+ }
+ }
+
+ @Test
+ public void bucketWrapperShouldInvokeOnFailureWhenCreateBucketFails() {
+ doAnswer(new Answer<Object>() {
+ public Object answer(InvocationOnMock invocation) {
+ //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
+ ((Completion) invocation.getArguments()[3]).onFailure();
+ return null;
+ }
+ }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
+
+ Completion completionSentToWrapper = mock(Completion.class);
+ wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
+
+ //verify onFailure is invoked
verify(completionSentToWrapper, times(1)).onFailure();
}
@Test
- public void bucketWrapperShouldInvokeOnSuccessWhenCreateBucketFails() throws UnknownHostException {
- BucketOperatorImpl delegate = mock(BucketOperatorImpl.class);
- BucketOperatorWrapper wrapper = createBucketOperatorWrapper(delegate);
-
- Map<String, Long> colocatedRegionBytes = new HashMap<String, Long>();
- int bucketId = 1;
- InternalDistributedMember targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
-
+ public void bucketWrapperShouldInvokeOnSuccessWhenCreateBucketSucceeds() {
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
//3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
@@ -94,4 +161,147 @@ public class BucketOperatorWrapperTest {
verify(completionSentToWrapper, times(1)).onSuccess();
}
+
+ @Test
+ public void bucketWrapperShouldDelegateMoveBucketToEnclosedOperator() {
+ doReturn(true).when(delegate).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
+
+ wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
+
+ //verify the delegate is invoked
+ verify(delegate, times(1)).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
+
+ //verify we recorded necessary stats
+ verify(stats, times(1)).startBucketTransfer(anyInt());
+ verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong());
+ }
+
+ @Test
+ public void bucketWrapperShouldRecordBytesTransferredPerRegionAfterMoveBucketIsSuccessful() {
+ doReturn(true).when(delegate).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
+
+ wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
+
+ //verify the details is updated with bytes transfered
+ for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
+ if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
+ verify(details, times(1)).incTransfers(eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME)), anyLong());
+ else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
+ verify(details, times(1)).incTransfers(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME), 0); //elapsed is recorded only if its leader
+ }
+
+ //verify we recorded necessary stats
+ verify(stats, times(1)).startBucketTransfer(anyInt());
+ verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong());
+ }
+
+ @Test
+ public void bucketWrapperShouldDoNotRecordBytesTransferedIfMoveBucketFails() {
+ doReturn(false).when(delegate).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
+
+ wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
+
+ //verify the details is not updated with bytes transfered
+ for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
+ verify(details, times(0)).incTransfers(anyLong(), anyLong());
+ }
+
+ //verify we recorded necessary stats
+ verify(stats, times(1)).startBucketTransfer(anyInt());
+ verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong());
+ }
+
+ @Test
+ public void bucketWrapperShouldDelegateRemoveBucketToEnclosedOperator() {
+ wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes);
+
+ //verify the delegate is invoked
+ verify(delegate, times(1)).removeBucket(targetMember, bucketId, colocatedRegionBytes);
+
+ //verify we recorded necessary stats
+ verify(stats, times(1)).startBucketRemove(anyInt());
+ verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong());
+ }
+
+ @Test
+ public void bucketWrapperShouldRecordBucketRemovesPerRegionAfterRemoveBucketIsSuccessful() {
+ doReturn(true).when(delegate).removeBucket(targetMember, bucketId, colocatedRegionBytes);
+
+ wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes);
+
+ //verify the details is updated with bytes transfered
+ for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
+ if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
+ verify(details, times(1)).incRemoves((eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME))), anyLong());
+ else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
+ verify(details, times(1)).incRemoves(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME), 0); //elapsed is recorded only if its leader
+ }
+
+ //verify we recorded necessary stats
+ verify(stats, times(1)).startBucketRemove(anyInt());
+ verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong());
+ }
+
+ @Test
+ public void bucketWrapperShouldDoNotRecordBucketRemovesIfMoveBucketFails() {
+ doReturn(false).when(delegate).removeBucket(targetMember, bucketId, colocatedRegionBytes);
+
+ wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes);
+
+ //verify the details is not updated with bytes transfered
+ for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
+ verify(details, times(0)).incTransfers(anyLong(), anyLong());
+ }
+
+ //verify we recorded necessary stats
+ verify(stats, times(1)).startBucketRemove(anyInt());
+ verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong());
+ }
+
+ @Test
+ public void bucketWrapperShouldDelegateMovePrimaryToEnclosedOperator() {
+ wrapper.movePrimary(sourceMember, targetMember, bucketId);
+
+ //verify the delegate is invoked
+ verify(delegate, times(1)).movePrimary(sourceMember, targetMember, bucketId);
+
+ //verify we recorded necessary stats
+ verify(stats, times(1)).startPrimaryTransfer(anyInt());
+ verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong());
+ }
+
+ @Test
+ public void bucketWrapperShouldRecordPrimaryTransfersPerRegionAfterMovePrimaryIsSuccessful() {
+ doReturn(true).when(delegate).movePrimary(sourceMember, targetMember, bucketId);
+
+ wrapper.movePrimary(sourceMember, targetMember, bucketId);
+
+ //verify the details is updated with bytes transfered
+ for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
+ if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
+ verify(details, times(1)).incPrimaryTransfers(anyLong());
+ else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
+ verify(details, times(1)).incPrimaryTransfers(0); //elapsed is recorded only if its leader
+ }
+
+ //verify we recorded necessary stats
+ verify(stats, times(1)).startPrimaryTransfer(anyInt());
+ verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong());
+ }
+
+ @Test
+ public void bucketWrapperShouldNotRecordPrimaryTransfersPerRegionAfterMovePrimaryFails() {
+ doReturn(false).when(delegate).movePrimary(sourceMember, targetMember, bucketId);
+
+ wrapper.movePrimary(sourceMember, targetMember, bucketId);
+
+ //verify the details is not updated with bytes transfered
+ for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
+ verify(details, times(0)).incTransfers(anyLong(), anyLong());
+ }
+
+ //verify we recorded necessary stats
+ verify(stats, times(1)).startPrimaryTransfer(anyInt());
+ verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong());
+ }
}