You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by sa...@apache.org on 2016/04/04 21:11:19 UTC

incubator-geode git commit: Added tests for BucketOperatorWrapper

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-1153 41312881c -> c964c27ac


Added tests for BucketOperatorWrapper


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/c964c27a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/c964c27a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/c964c27a

Branch: refs/heads/feature/GEODE-1153
Commit: c964c27acc5b230b4a43da122e64b96c7d942a9a
Parents: 4131288
Author: Sai Boorlagadda <sb...@pivotal.io>
Authored: Mon Apr 4 12:08:12 2016 -0700
Committer: Sai Boorlagadda <sb...@pivotal.io>
Committed: Mon Apr 4 12:08:12 2016 -0700

----------------------------------------------------------------------
 .../rebalance/BucketOperatorWrapper.java        |  32 ++-
 .../control/RebalanceOperationDUnitTest.java    |   6 +-
 .../rebalance/BucketOperatorWrapperTest.java    | 258 +++++++++++++++++--
 3 files changed, 259 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c964c27a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
index 0d0edd2..c5ae30d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
@@ -20,7 +20,8 @@ public class BucketOperatorWrapper implements BucketOperator {
   private final ResourceManagerStats stats;
   private final PartitionedRegion leaderRegion;
 
-  public BucketOperatorWrapper(BucketOperator delegate, Set<PartitionRebalanceDetailsImpl> rebalanceDetails, ResourceManagerStats stats, PartitionedRegion leaderRegion) {
+  public BucketOperatorWrapper(BucketOperator delegate, Set<PartitionRebalanceDetailsImpl> rebalanceDetails, 
+      ResourceManagerStats stats, PartitionedRegion leaderRegion) {
     this.delegate = delegate;
     this.detailSet = rebalanceDetails;
     this.regionCount = detailSet.size();
@@ -29,7 +30,9 @@ public class BucketOperatorWrapper implements BucketOperator {
   }
 
   @Override
-  public boolean moveBucket(InternalDistributedMember sourceMember, InternalDistributedMember targetMember, int id, Map<String, Long> colocatedRegionBytes) {
+  public boolean moveBucket(InternalDistributedMember sourceMember, 
+      InternalDistributedMember targetMember, int id, 
+      Map<String, Long> colocatedRegionBytes) {
     long start = System.nanoTime();
     boolean result = false;
     long elapsed = 0;
@@ -50,7 +53,8 @@ public class BucketOperatorWrapper implements BucketOperator {
           Long regionBytes = colocatedRegionBytes.get(regionPath);
           if (regionBytes != null) {
             // only increment the elapsed time for the leader region
-            details.incTransfers(regionBytes.longValue(), details.getRegion().equals(leaderRegion) ? elapsed : 0);
+            details.incTransfers(regionBytes.longValue(), 
+                details.getRegion().equals(leaderRegion) ? elapsed : 0);
             totalBytes += regionBytes.longValue();
           }
         }
@@ -69,15 +73,17 @@ public class BucketOperatorWrapper implements BucketOperator {
   }
 
   @Override
-  public void createRedundantBucket(final InternalDistributedMember targetMember, final int i, final Map<String, Long> colocatedRegionBytes,
-      final Completion completion) {
+  public void createRedundantBucket(
+      final InternalDistributedMember targetMember, final int i, 
+      final Map<String, Long> colocatedRegionBytes, final Completion completion) {
 
     if (stats != null) {
       stats.startBucketCreate(regionCount);
     }
 
     final long start = System.nanoTime();
-    delegate.createRedundantBucket(targetMember, i, colocatedRegionBytes, new Completion() {
+    delegate.createRedundantBucket(targetMember, i, 
+        colocatedRegionBytes, new Completion() {
 
       @Override
       public void onSuccess() {
@@ -109,9 +115,9 @@ public class BucketOperatorWrapper implements BucketOperator {
       public void onFailure() {
         long elapsed = System.nanoTime() - start;
 
-        //if (logger.isDebugEnabled()) {
+        if (logger.isDebugEnabled()) {
           logger.info("Rebalancing {} redundant bucket {} failed creation on {}", leaderRegion, i, targetMember);
-        //}
+        }
 
         if (stats != null) {
           stats.endBucketCreate(regionCount, false, 0, elapsed);
@@ -124,7 +130,9 @@ public class BucketOperatorWrapper implements BucketOperator {
   }
 
   @Override
-  public boolean removeBucket(InternalDistributedMember targetMember, int i, Map<String, Long> colocatedRegionBytes) {
+  public boolean removeBucket(
+      InternalDistributedMember targetMember, int i, 
+      Map<String, Long> colocatedRegionBytes) {
     boolean result = false;
     long elapsed = 0;
     long totalBytes = 0;
@@ -146,7 +154,8 @@ public class BucketOperatorWrapper implements BucketOperator {
           if (lrb != null) { // region could have gone away - esp during shutdow
             long regionBytes = lrb.longValue();
             // Only add the elapsed time to the leader region.
-            details.incRemoves(regionBytes, details.getRegion().equals(leaderRegion) ? elapsed : 0);
+            details.incRemoves(regionBytes, 
+                details.getRegion().equals(leaderRegion) ? elapsed : 0);
             totalBytes += regionBytes;
           }
         }
@@ -165,7 +174,8 @@ public class BucketOperatorWrapper implements BucketOperator {
   }
 
   @Override
-  public boolean movePrimary(InternalDistributedMember source, InternalDistributedMember target, int bucketId) {
+  public boolean movePrimary(InternalDistributedMember source, 
+      InternalDistributedMember target, int bucketId) {
     boolean result = false;
     long elapsed = 0;
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c964c27a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
index 0dfbcb9..a5d2a03 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
@@ -1015,13 +1015,15 @@ public class RebalanceOperationDUnitTest extends CacheTestCase {
     vm0.invoke(checkRedundancy);
         
     //Now create the region in 2 more VMs
+    //Let localMaxMemory(VM1) > localMaxMemory(VM2)
+    //so that redundant bucket will always be attempted on VM1
     final DistributedMember member2 = createPrRegion(vm1, "region1", 100, null);
-    final DistributedMember member3 = createPrRegion(vm2, "region1", 100, null);
+    final DistributedMember member3 = createPrRegion(vm2, "region1", 90, null);
     
     vm0.invoke(checkRedundancy);
     
     //Inject mock PRHARedundancyProvider to simulate createBucketFailures
-    vm0.invoke(new SerializableRunnable("inject failure") {
+    vm0.invoke(new SerializableRunnable("injectCreateBucketFailureAndRebalance") {
       
       @Override
       public void run() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c964c27a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
index 3678430..6b1551b 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
@@ -7,9 +7,11 @@ import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.spy;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -18,6 +20,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.invocation.InvocationOnMock;
@@ -28,36 +31,87 @@ import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.internal.cache.control.PartitionRebalanceDetailsImpl;
 import com.gemstone.gemfire.internal.cache.control.ResourceManagerStats;
 import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator.Completion;
-import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorImpl;
-import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorWrapper;
 import com.gemstone.gemfire.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
 public class BucketOperatorWrapperTest {
   
-  public BucketOperatorWrapper createBucketOperatorWrapper(BucketOperatorImpl delegate) {
+  private ResourceManagerStats stats;
+  private PartitionedRegion leaderRegion;
+  private PartitionedRegion colocatedRegion;
+  private Set<PartitionRebalanceDetailsImpl> rebalanceDetails;
+  private BucketOperatorWrapper wrapper;
+  private BucketOperatorImpl delegate;
+  
+  private Map<String, Long> colocatedRegionBytes;
+  private int bucketId = 1;
+  private InternalDistributedMember sourceMember, targetMember;
+  
+  private final static String PR_LEADER_REGION_NAME = "leadregion1";
+  private final static String PR_COLOCATED_REGION_NAME = "coloregion1";
+  
+  @Before
+  public void setUp() throws UnknownHostException {
+    colocatedRegionBytes = new HashMap<String, Long>();
+    colocatedRegionBytes.put(PR_LEADER_REGION_NAME, 100L);
+    colocatedRegionBytes.put(PR_COLOCATED_REGION_NAME, 50L);
     
-    ResourceManagerStats stats = mock(ResourceManagerStats.class);
+    sourceMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
+    targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.2"), 1);
+
+    stats = mock(ResourceManagerStats.class);
     doNothing().when(stats).startBucketCreate(anyInt());
     doNothing().when(stats).endBucketCreate(anyInt(), anyBoolean(), anyLong(), anyLong());
     
-    Set<PartitionRebalanceDetailsImpl> rebalanceDetails = new HashSet<PartitionRebalanceDetailsImpl>();
-    PartitionedRegion region = mock(PartitionedRegion.class);
+    leaderRegion = mock(PartitionedRegion.class);
+    doReturn(PR_LEADER_REGION_NAME).when(leaderRegion).getFullPath();
+    colocatedRegion = mock(PartitionedRegion.class);
+    doReturn(PR_COLOCATED_REGION_NAME).when(colocatedRegion).getFullPath();
+    
+    rebalanceDetails = new HashSet<PartitionRebalanceDetailsImpl>();
+    PartitionRebalanceDetailsImpl details = spy(new PartitionRebalanceDetailsImpl(leaderRegion));
+    rebalanceDetails.add(details);
+    
+    delegate = mock(BucketOperatorImpl.class);
     
-    BucketOperatorWrapper wrapper = new BucketOperatorWrapper(delegate, rebalanceDetails, stats, region);
+    wrapper = new BucketOperatorWrapper(delegate, rebalanceDetails, stats, leaderRegion);
+  }
+  
+  @Test
+  public void bucketWrapperShouldDelegateCreateBucketToEnclosedOperator() {
+    Completion completionSentToWrapper = mock(Completion.class);
+
+    doNothing().when(delegate).createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
+
+    wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
     
-    return wrapper;
+    verify(delegate, times(1)).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
   }
   
   @Test
-  public void bucketWrapperShouldInvokeOnFailureWhenCreateBucketFails() throws UnknownHostException {
-    BucketOperatorImpl delegate = mock(BucketOperatorImpl.class);   
-    BucketOperatorWrapper wrapper = createBucketOperatorWrapper(delegate);
-        
-    Map<String, Long> colocatedRegionBytes = new HashMap<String, Long>();
-    int bucketId = 1;
-    InternalDistributedMember targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
-        
+  public void bucketWrapperShouldRecordNumberOfBucketsCreatedIfCreateBucketSucceeds() {
+    doAnswer(new Answer<Object>() {
+      public Object answer(InvocationOnMock invocation) {
+        //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
+        ((Completion) invocation.getArguments()[3]).onSuccess();
+        return null;
+      }
+    }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
+    
+    Completion completionSentToWrapper = mock(Completion.class);
+    wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
+    
+    //verify create buckets is recorded
+    for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
+      if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
+        verify(details, times(1)).incCreates(eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME)), anyLong());
+      else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
+        verify(details, times(1)).incTransfers(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME), 0); //elapsed is recorded only if its leader
+    }
+  }
+  
+  @Test
+  public void bucketWrapperShouldNotRecordNumberOfBucketsCreatedIfCreateBucketFails() {
     doAnswer(new Answer<Object>() {
       public Object answer(InvocationOnMock invocation) {
         //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
@@ -69,18 +123,31 @@ public class BucketOperatorWrapperTest {
     Completion completionSentToWrapper = mock(Completion.class);
     wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
     
+    //verify create buckets is not recorded
+    for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
+      verify(details, times(0)).incTransfers(anyLong(), anyLong());
+    }
+  }
+  
+  @Test
+  public void bucketWrapperShouldInvokeOnFailureWhenCreateBucketFails() {        
+    doAnswer(new Answer<Object>() {
+      public Object answer(InvocationOnMock invocation) {
+        //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
+        ((Completion) invocation.getArguments()[3]).onFailure();
+        return null;
+      }
+    }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
+    
+    Completion completionSentToWrapper = mock(Completion.class);
+    wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
+    
+    //verify onFailure is invoked
     verify(completionSentToWrapper, times(1)).onFailure();
   }
   
   @Test
-  public void bucketWrapperShouldInvokeOnSuccessWhenCreateBucketFails() throws UnknownHostException {   
-    BucketOperatorImpl delegate = mock(BucketOperatorImpl.class);   
-    BucketOperatorWrapper wrapper =  createBucketOperatorWrapper(delegate);
-    
-    Map<String, Long> colocatedRegionBytes = new HashMap<String, Long>();
-    int bucketId = 1;
-    InternalDistributedMember targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
-        
+  public void bucketWrapperShouldInvokeOnSuccessWhenCreateBucketSucceeds() {       
     doAnswer(new Answer<Object>() {
       public Object answer(InvocationOnMock invocation) {
         //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
@@ -94,4 +161,147 @@ public class BucketOperatorWrapperTest {
     
     verify(completionSentToWrapper, times(1)).onSuccess();
   }
+  
+  @Test
+  public void bucketWrapperShouldDelegateMoveBucketToEnclosedOperator() {    
+    doReturn(true).when(delegate).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
+
+    wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
+    
+    //verify the delegate is invoked
+    verify(delegate, times(1)).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
+    
+    //verify we recorded necessary stats
+    verify(stats, times(1)).startBucketTransfer(anyInt());
+    verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong());
+  }
+  
+  @Test
+  public void bucketWrapperShouldRecordBytesTransferredPerRegionAfterMoveBucketIsSuccessful() {    
+    doReturn(true).when(delegate).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
+
+    wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
+    
+    //verify the details is updated with bytes transfered
+    for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
+      if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
+        verify(details, times(1)).incTransfers(eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME)), anyLong());
+      else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
+        verify(details, times(1)).incTransfers(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME), 0); //elapsed is recorded only if its leader
+    }
+    
+    //verify we recorded necessary stats
+    verify(stats, times(1)).startBucketTransfer(anyInt());
+    verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong());
+  }
+  
+  @Test
+  public void bucketWrapperShouldDoNotRecordBytesTransferedIfMoveBucketFails() {
+    doReturn(false).when(delegate).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
+
+    wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
+    
+    //verify the details is not updated with bytes transfered
+    for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
+        verify(details, times(0)).incTransfers(anyLong(), anyLong());
+    }
+    
+    //verify we recorded necessary stats
+    verify(stats, times(1)).startBucketTransfer(anyInt());
+    verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong());
+  }
+  
+  @Test
+  public void bucketWrapperShouldDelegateRemoveBucketToEnclosedOperator() {    
+    wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes);
+    
+    //verify the delegate is invoked
+    verify(delegate, times(1)).removeBucket(targetMember, bucketId, colocatedRegionBytes);
+    
+    //verify we recorded necessary stats
+    verify(stats, times(1)).startBucketRemove(anyInt());
+    verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong());
+  }
+  
+  @Test
+  public void bucketWrapperShouldRecordBucketRemovesPerRegionAfterRemoveBucketIsSuccessful() {    
+    doReturn(true).when(delegate).removeBucket(targetMember, bucketId, colocatedRegionBytes);
+
+    wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes);
+    
+    //verify the details is updated with bytes transfered
+    for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
+      if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
+        verify(details, times(1)).incRemoves((eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME))), anyLong());
+      else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
+        verify(details, times(1)).incRemoves(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME), 0); //elapsed is recorded only if its leader
+    }
+    
+    //verify we recorded necessary stats
+    verify(stats, times(1)).startBucketRemove(anyInt());
+    verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong());
+  }
+  
+  @Test
+  public void bucketWrapperShouldDoNotRecordBucketRemovesIfMoveBucketFails() {
+    doReturn(false).when(delegate).removeBucket(targetMember, bucketId, colocatedRegionBytes);
+
+    wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes);
+    
+    //verify the details is not updated with bytes transfered
+    for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
+        verify(details, times(0)).incTransfers(anyLong(), anyLong());
+    }
+    
+    //verify we recorded necessary stats
+    verify(stats, times(1)).startBucketRemove(anyInt());
+    verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong());
+  }
+  
+  @Test
+  public void bucketWrapperShouldDelegateMovePrimaryToEnclosedOperator() {    
+    wrapper.movePrimary(sourceMember, targetMember, bucketId);
+    
+    //verify the delegate is invoked
+    verify(delegate, times(1)).movePrimary(sourceMember, targetMember, bucketId);
+    
+    //verify we recorded necessary stats
+    verify(stats, times(1)).startPrimaryTransfer(anyInt());
+    verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong());
+  }
+  
+  @Test
+  public void bucketWrapperShouldRecordPrimaryTransfersPerRegionAfterMovePrimaryIsSuccessful() {    
+    doReturn(true).when(delegate).movePrimary(sourceMember, targetMember, bucketId);
+
+    wrapper.movePrimary(sourceMember, targetMember, bucketId);
+    
+    //verify the details is updated with bytes transfered
+    for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
+      if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
+        verify(details, times(1)).incPrimaryTransfers(anyLong());
+      else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
+        verify(details, times(1)).incPrimaryTransfers(0); //elapsed is recorded only if its leader
+    }
+    
+    //verify we recorded necessary stats
+    verify(stats, times(1)).startPrimaryTransfer(anyInt());
+    verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong());
+  }
+  
+  @Test
+  public void bucketWrapperShouldNotRecordPrimaryTransfersPerRegionAfterMovePrimaryFails() {
+    doReturn(false).when(delegate).movePrimary(sourceMember, targetMember, bucketId);
+
+    wrapper.movePrimary(sourceMember, targetMember, bucketId);
+    
+    //verify the details is not updated with bytes transfered
+    for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
+        verify(details, times(0)).incTransfers(anyLong(), anyLong());
+    }
+    
+    //verify we recorded necessary stats
+    verify(stats, times(1)).startPrimaryTransfer(anyInt());
+    verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong());
+  }
 }