You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by sa...@apache.org on 2016/04/11 20:18:51 UTC

incubator-geode git commit: Revert "GEODE-1153: BucketOperatorWrapper fails to invoke completion callback."

Repository: incubator-geode
Updated Branches:
  refs/heads/develop 616bc27de -> 7c1d867d8


Revert "GEODE-1153: BucketOperatorWrapper fails to invoke completion callback."

 Reverting the fix for GEODE-1153 due to failures introduced
 by this fix. Rebalancing operation hangs due to a concurrent
 modification on a TreeSet which holds the bucket information
 during the rebalance operation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7c1d867d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7c1d867d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7c1d867d

Branch: refs/heads/develop
Commit: 7c1d867d82b0dc45954f1d996b5f327a73abcf1d
Parents: 616bc27
Author: Sai Boorlagadda <sb...@pivotal.io>
Authored: Mon Apr 11 10:25:52 2016 -0700
Committer: Sai Boorlagadda <sb...@pivotal.io>
Committed: Mon Apr 11 10:31:34 2016 -0700

----------------------------------------------------------------------
 .../PartitionedRegionRebalanceOp.java           | 322 ++++++++++++++++--
 .../rebalance/BucketOperatorImpl.java           |  78 -----
 .../rebalance/BucketOperatorWrapper.java        | 235 --------------
 .../control/RebalanceOperationDUnitTest.java    | 148 +--------
 .../rebalance/BucketOperatorImplTest.java       | 138 --------
 .../rebalance/BucketOperatorWrapperTest.java    | 323 -------------------
 6 files changed, 296 insertions(+), 948 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c1d867d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
index 641d43d..8642876 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
@@ -47,8 +47,6 @@ import com.gemstone.gemfire.internal.cache.partitioned.BecomePrimaryBucketMessag
 import com.gemstone.gemfire.internal.cache.partitioned.MoveBucketMessage.MoveBucketResponse;
 import com.gemstone.gemfire.internal.cache.partitioned.RemoveBucketMessage.RemoveBucketResponse;
 import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator;
-import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorImpl;
-import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorWrapper;
 import com.gemstone.gemfire.internal.cache.partitioned.rebalance.ParallelBucketOperator;
 import com.gemstone.gemfire.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel;
 import com.gemstone.gemfire.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.AddressComparor;
@@ -415,9 +413,9 @@ public class PartitionedRegionRebalanceOp {
     }
     BucketOperator operator = simulate ? 
         new SimulatedBucketOperator() 
-        : new BucketOperatorImpl(this);
+        : new BucketOperatorImpl();
     BucketOperatorWrapper wrapper = new BucketOperatorWrapper(
-        operator, rebalanceDetails, stats, leaderRegion);
+        operator, rebalanceDetails);
     return wrapper;
   }
 
@@ -511,12 +509,17 @@ public class PartitionedRegionRebalanceOp {
    *          the member on which to create the redundant bucket
    * @param bucketId
    *          the identifier of the bucket
+   * @param pr
+   *          the partitioned region which contains the bucket
+   * @param forRebalance
+   *          true if part of a rebalance operation
    * @return true if the redundant bucket was created
    */
-  public boolean createRedundantBucketForRegion(
-      InternalDistributedMember target, int bucketId) {
-    return getLeaderRegion().getRedundancyProvider().createBackupBucketOnMember(bucketId,
-        target, isRebalance, replaceOfflineData,null, true);
+  public static boolean createRedundantBucketForRegion(
+      InternalDistributedMember target, int bucketId, PartitionedRegion pr,
+      boolean forRebalance, boolean replaceOfflineData) {
+    return pr.getRedundancyProvider().createBackupBucketOnMember(bucketId,
+        target, forRebalance, replaceOfflineData,null, true);
   }
   
   /**
@@ -526,18 +529,20 @@ public class PartitionedRegionRebalanceOp {
    *          the member on which to create the redundant bucket
    * @param bucketId
    *          the identifier of the bucket
+   * @param pr
+   *          the partitioned region which contains the bucket
    * @return true if the redundant bucket was removed
    */
-  public boolean removeRedundantBucketForRegion(
-      InternalDistributedMember target, int bucketId) {
+  public static boolean removeRedundantBucketForRegion(
+      InternalDistributedMember target, int bucketId, PartitionedRegion pr) {
     boolean removed = false;
-    if (getLeaderRegion().getDistributionManager().getId().equals(target)) {
+    if (pr.getDistributionManager().getId().equals(target)) {
       // invoke directly on local member...
-      removed = getLeaderRegion().getDataStore().removeBucket(bucketId, false);
+      removed = pr.getDataStore().removeBucket(bucketId, false);
     }
     else {
       // send message to remote member...
-      RemoveBucketResponse response = RemoveBucketMessage.send(target, getLeaderRegion(),
+      RemoveBucketResponse response = RemoveBucketMessage.send(target, pr,
           bucketId, false);
       if (response != null) {
         removed = response.waitForResponse();
@@ -553,23 +558,28 @@ public class PartitionedRegionRebalanceOp {
    *          the member which should be primary
    * @param bucketId
    *          the identifier of the bucket
+   * @param pr
+   *          the partitioned region which contains the bucket
+   * @param forRebalance
+   *          true if part of a rebalance operation
    * @return true if the move was successful
    */
-  public boolean movePrimaryBucketForRegion(
-      InternalDistributedMember target, int bucketId) {
+  public static boolean movePrimaryBucketForRegion(
+      InternalDistributedMember target, int bucketId, PartitionedRegion pr,
+      boolean forRebalance) {
     boolean movedPrimary = false;
-    if (getLeaderRegion().getDistributionManager().getId().equals(target)) {
+    if (pr.getDistributionManager().getId().equals(target)) {
       // invoke directly on local member...
-      BucketAdvisor bucketAdvisor = getLeaderRegion().getRegionAdvisor().getBucketAdvisor(
+      BucketAdvisor bucketAdvisor = pr.getRegionAdvisor().getBucketAdvisor(
           bucketId);
       if (bucketAdvisor.isHosting()) {
-        movedPrimary = bucketAdvisor.becomePrimary(isRebalance);
+        movedPrimary = bucketAdvisor.becomePrimary(forRebalance);
       }
     }
     else {
       // send message to remote member...
       BecomePrimaryBucketResponse response = BecomePrimaryBucketMessage.send(
-          target, getLeaderRegion(), bucketId, isRebalance);
+          target, pr, bucketId, forRebalance);
       if (response != null) {
         movedPrimary = response.waitForResponse();
       }
@@ -586,18 +596,20 @@ public class PartitionedRegionRebalanceOp {
    *          member which should receive the bucket
    * @param bucketId
    *          the identifier of the bucket
+   * @param pr
+   *          the partitioned region which contains the bucket
    * @return true if the bucket was moved
    */
-  public boolean moveBucketForRegion(InternalDistributedMember source,
-      InternalDistributedMember target, int bucketId) {
+  public static boolean moveBucketForRegion(InternalDistributedMember source,
+      InternalDistributedMember target, int bucketId, PartitionedRegion pr) {
     boolean movedBucket = false;
-    if (getLeaderRegion().getDistributionManager().getId().equals(target)) {
+    if (pr.getDistributionManager().getId().equals(target)) {
       // invoke directly on local member...
-      movedBucket = getLeaderRegion().getDataStore().moveBucket(bucketId, source, false);
+      movedBucket = pr.getDataStore().moveBucket(bucketId, source, false);
     }
     else {
       // send message to remote member...
-      MoveBucketResponse response = MoveBucketMessage.send(target, getLeaderRegion(),
+      MoveBucketResponse response = MoveBucketMessage.send(target, pr,
           bucketId, source);
       if (response != null) {
         movedBucket = response.waitForResponse();
@@ -614,10 +626,6 @@ public class PartitionedRegionRebalanceOp {
            leaderRegion.getDataPolicy().withPersistence());
   }
   
-  public PartitionedRegion getLeaderRegion() {
-    return leaderRegion;
-  }
-  
   private class MembershipChangeListener implements MembershipListener {
 
     public void memberDeparted(InternalDistributedMember id, boolean crashed) {
@@ -642,4 +650,262 @@ public class PartitionedRegionRebalanceOp {
     public void quorumLost(Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {
     }
   }
+  
+  private class BucketOperatorImpl implements BucketOperator {
+
+    @Override
+    public boolean moveBucket(InternalDistributedMember source,
+        InternalDistributedMember target, int bucketId,
+        Map<String, Long> colocatedRegionBytes) {
+
+      InternalResourceManager.getResourceObserver().movingBucket(
+          leaderRegion, bucketId, source, target);
+      return moveBucketForRegion(source, target, bucketId, leaderRegion);
+    }
+
+    @Override
+    public boolean movePrimary(InternalDistributedMember source,
+        InternalDistributedMember target, int bucketId) {
+
+      InternalResourceManager.getResourceObserver().movingPrimary(
+          leaderRegion, bucketId, source, target);
+      return movePrimaryBucketForRegion(target, bucketId, leaderRegion, isRebalance); 
+    }
+
+    @Override
+    public void createRedundantBucket(
+        InternalDistributedMember targetMember, int bucketId,
+        Map<String, Long> colocatedRegionBytes, Completion completion) {
+      boolean result = false;
+      try {
+        result = createRedundantBucketForRegion(targetMember, bucketId,
+          leaderRegion, isRebalance,replaceOfflineData);
+      } finally {
+        if(result) {
+          completion.onSuccess();
+        } else {
+          completion.onFailure();
+        }
+      }
+    }
+    
+    @Override
+    public void waitForOperations() {
+      //do nothing, all operations are synchronous
+    }
+
+    @Override
+    public boolean removeBucket(InternalDistributedMember targetMember, int bucketId,
+        Map<String, Long> colocatedRegionBytes) {
+      return removeRedundantBucketForRegion(targetMember, bucketId,
+          leaderRegion);
+    }
+  }
+
+  /**
+   * A wrapper class which delegates actual bucket operations to the enclosed BucketOperator,
+   * but keeps track of statistics about how many buckets are created, transfered, etc.
+   *
+   */
+  private class BucketOperatorWrapper implements 
+      BucketOperator {
+    private final BucketOperator delegate;
+    private final Set<PartitionRebalanceDetailsImpl> detailSet;
+    private final int regionCount;
+  
+    public BucketOperatorWrapper(
+        BucketOperator delegate,
+        Set<PartitionRebalanceDetailsImpl> rebalanceDetails) {
+      this.delegate = delegate;
+      this.detailSet = rebalanceDetails;
+      this.regionCount = detailSet.size();
+    }
+    @Override
+    public boolean moveBucket(InternalDistributedMember sourceMember,
+        InternalDistributedMember targetMember, int id,
+        Map<String, Long> colocatedRegionBytes) {
+      long start = System.nanoTime();
+      boolean result = false;
+      long elapsed = 0;
+      long totalBytes = 0;
+
+
+      if (stats != null) {
+        stats.startBucketTransfer(regionCount);
+      }
+      try {
+        result = delegate.moveBucket(sourceMember, targetMember, id,
+            colocatedRegionBytes);
+        elapsed = System.nanoTime() - start;
+        if (result) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("Rebalancing {} bucket {} moved from {} to {}", leaderRegion, id, sourceMember, targetMember);
+          }
+          for (PartitionRebalanceDetailsImpl details : detailSet) {
+            String regionPath = details.getRegionPath();
+            Long regionBytes = colocatedRegionBytes.get(regionPath);
+            if(regionBytes != null) {
+            //only increment the elapsed time for the leader region
+              details.incTransfers(regionBytes.longValue(),
+                  details.getRegion().equals(leaderRegion) ? elapsed : 0);
+              totalBytes += regionBytes.longValue();
+            }
+          }
+        } else {
+          if (logger.isDebugEnabled()) {
+            logger.debug("Rebalancing {} bucket {} moved failed from {} to {}", leaderRegion, id, sourceMember, targetMember);
+          }
+        }
+      } finally {
+        if(stats != null) {
+          stats.endBucketTransfer(regionCount, result, totalBytes, elapsed);
+        }
+      }
+      
+      return result;
+    }
+
+    @Override
+    public void createRedundantBucket(
+        final InternalDistributedMember targetMember, final int i, 
+        final Map<String, Long> colocatedRegionBytes, final Completion completion) {
+      
+      if(stats != null) {
+        stats.startBucketCreate(regionCount);
+      }
+      
+      final long start = System.nanoTime();
+      delegate.createRedundantBucket(targetMember, i,  
+          colocatedRegionBytes, new Completion() {
+
+        @Override
+        public void onSuccess() {
+          long totalBytes = 0;
+          long elapsed= System.nanoTime() - start;
+          if(logger.isDebugEnabled()) {
+            logger.debug("Rebalancing {} redundant bucket {} created on {}", leaderRegion, i, targetMember);
+          }
+          for (PartitionRebalanceDetailsImpl details : detailSet) {
+            String regionPath = details.getRegionPath();
+            Long lrb = colocatedRegionBytes.get(regionPath);
+            if (lrb != null) { // region could have gone away - esp during shutdow
+              long regionBytes = lrb.longValue();
+              //Only add the elapsed time to the leader region.
+              details.incCreates(regionBytes, 
+                  details.getRegion().equals(leaderRegion) ? elapsed : 0);
+              totalBytes += regionBytes;
+            }
+          }
+
+          if(stats != null) {
+            stats.endBucketCreate(regionCount, true, totalBytes, elapsed);
+          }
+
+        }
+
+        @Override
+        public void onFailure() {
+          long elapsed= System.nanoTime() - start;
+
+          if (logger.isDebugEnabled()) {
+            logger.debug("Rebalancing {} redundant bucket {} failed creation on {}", leaderRegion, i, targetMember);
+          }
+
+          if(stats != null) {
+            stats.endBucketCreate(regionCount, false, 0, elapsed);
+          }
+        }
+      });
+    }
+    
+    @Override
+    public boolean removeBucket(
+        InternalDistributedMember targetMember, int i, 
+        Map<String, Long> colocatedRegionBytes) {
+      boolean result = false;
+      long elapsed = 0;
+      long totalBytes = 0;
+      
+      
+      if(stats != null) {
+        stats.startBucketRemove(regionCount);
+      }
+      try {
+        long start = System.nanoTime();
+        result = delegate.removeBucket(targetMember, i,  
+            colocatedRegionBytes);
+        elapsed= System.nanoTime() - start;
+        if (result) {
+          if(logger.isDebugEnabled()) {
+            logger.debug("Rebalancing {} redundant bucket {} removed from {}", leaderRegion, i, targetMember);
+          }
+          for (PartitionRebalanceDetailsImpl details : detailSet) {
+            String regionPath = details.getRegionPath();
+            Long lrb = colocatedRegionBytes.get(regionPath);
+            if (lrb != null) { // region could have gone away - esp during shutdow
+              long regionBytes = lrb.longValue();
+              //Only add the elapsed time to the leader region.
+              details.incRemoves(regionBytes, 
+                  details.getRegion().equals(leaderRegion) ? elapsed : 0);
+              totalBytes += regionBytes;
+            }
+          }
+        } else {
+          if (logger.isDebugEnabled()) {
+            logger.debug("Rebalancing {} redundant bucket {} failed removal o{}", leaderRegion, i, targetMember);
+          }
+        }
+      } finally {
+        if(stats != null) {
+          stats.endBucketRemove(regionCount, result, totalBytes, elapsed);
+        }
+      }
+      
+      return result;
+    }
+  
+    @Override
+    public boolean movePrimary(InternalDistributedMember source,
+        InternalDistributedMember target, int bucketId) {
+      boolean result = false;
+      long elapsed = 0;
+      
+      if(stats != null) {
+        stats.startPrimaryTransfer(regionCount);
+      }
+
+      try {
+        long start = System.nanoTime();
+        result = delegate.movePrimary(source, target, bucketId);
+        elapsed = System.nanoTime() - start;
+        if (result) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("Rebalancing {} primary bucket {} moved from {} to {}", leaderRegion, bucketId, source, target);
+          }
+          for (PartitionRebalanceDetailsImpl details : detailSet) {
+            details.incPrimaryTransfers(details.getRegion().equals(leaderRegion) ? elapsed : 0);
+          }
+        } else {
+          if (logger.isDebugEnabled()) {
+            logger.debug("Rebalancing {} primary bucket {} failed to move from {} to {}", leaderRegion, bucketId, source, target);
+          }
+        }
+    } finally {
+      if(stats != null) {
+        stats.endPrimaryTransfer(regionCount, result, elapsed);
+      }
+    }
+      
+      return result;
+    }
+    
+    @Override
+    public void waitForOperations() {
+      delegate.waitForOperations();
+    }
+
+    public Set<PartitionRebalanceDetailsImpl> getDetailSet() {
+      return this.detailSet;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c1d867d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java
deleted file mode 100644
index 2f38752..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.partitioned.rebalance;
-
-import java.util.Map;
-
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
-import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionRebalanceOp;
-
-public class BucketOperatorImpl implements BucketOperator {
-  
-  private PartitionedRegionRebalanceOp rebalanceOp;
-
-  public BucketOperatorImpl(PartitionedRegionRebalanceOp rebalanceOp) {
-    this.rebalanceOp = rebalanceOp;
-  }
-
-  @Override
-  public boolean moveBucket(InternalDistributedMember source,
-      InternalDistributedMember target, int bucketId,
-      Map<String, Long> colocatedRegionBytes) {
-
-    InternalResourceManager.getResourceObserver().movingBucket(
-        rebalanceOp.getLeaderRegion(), bucketId, source, target);
-    return rebalanceOp.moveBucketForRegion(source, target, bucketId);
-  }
-
-  @Override
-  public boolean movePrimary(InternalDistributedMember source,
-      InternalDistributedMember target, int bucketId) {
-
-    InternalResourceManager.getResourceObserver().movingPrimary(
-        rebalanceOp.getLeaderRegion(), bucketId, source, target);
-    return rebalanceOp.movePrimaryBucketForRegion(target, bucketId); 
-  }
-
-  @Override
-  public void createRedundantBucket(
-      InternalDistributedMember targetMember, int bucketId,
-      Map<String, Long> colocatedRegionBytes, Completion completion) {
-    boolean result = false;
-    try {
-      result = rebalanceOp.createRedundantBucketForRegion(targetMember, bucketId);
-    } finally {
-      if(result) {
-        completion.onSuccess();
-      } else {
-        completion.onFailure();
-      }
-    }
-  }
-  
-  @Override
-  public void waitForOperations() {
-    //do nothing, all operations are synchronous
-  }
-
-  @Override
-  public boolean removeBucket(InternalDistributedMember targetMember, int bucketId,
-      Map<String, Long> colocatedRegionBytes) {
-    return rebalanceOp.removeRedundantBucketForRegion(targetMember, bucketId);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c1d867d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
deleted file mode 100644
index d058a04..0000000
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.partitioned.rebalance;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.control.PartitionRebalanceDetailsImpl;
-import com.gemstone.gemfire.internal.cache.control.ResourceManagerStats;
-import com.gemstone.gemfire.internal.logging.LogService;
-
-public class BucketOperatorWrapper implements BucketOperator {
-  private static final Logger logger = LogService.getLogger();
-  
-  private final BucketOperator delegate;
-  private final Set<PartitionRebalanceDetailsImpl> detailSet;
-  private final int regionCount;
-  private final ResourceManagerStats stats;
-  private final PartitionedRegion leaderRegion;
-
-  public BucketOperatorWrapper(BucketOperator delegate, Set<PartitionRebalanceDetailsImpl> rebalanceDetails, 
-      ResourceManagerStats stats, PartitionedRegion leaderRegion) {
-    this.delegate = delegate;
-    this.detailSet = rebalanceDetails;
-    this.regionCount = detailSet.size();
-    this.stats = stats;
-    this.leaderRegion = leaderRegion;
-  }
-
-  @Override
-  public boolean moveBucket(InternalDistributedMember sourceMember, 
-      InternalDistributedMember targetMember, int id, 
-      Map<String, Long> colocatedRegionBytes) {
-    long start = System.nanoTime();
-    boolean result = false;
-    long elapsed = 0;
-    long totalBytes = 0;
-
-    if (stats != null) {
-      stats.startBucketTransfer(regionCount);
-    }
-    try {
-      result = delegate.moveBucket(sourceMember, targetMember, id, colocatedRegionBytes);
-      elapsed = System.nanoTime() - start;
-      if (result) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Rebalancing {} bucket {} moved from {} to {}", leaderRegion, id, sourceMember, targetMember);
-        }
-        for (PartitionRebalanceDetailsImpl details : detailSet) {
-          String regionPath = details.getRegionPath();
-          Long regionBytes = colocatedRegionBytes.get(regionPath);
-          if (regionBytes != null) {
-            // only increment the elapsed time for the leader region
-            details.incTransfers(regionBytes.longValue(), 
-                details.getRegion().equals(leaderRegion) ? elapsed : 0);
-            totalBytes += regionBytes.longValue();
-          }
-        }
-      } else {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Rebalancing {} bucket {} moved failed from {} to {}", leaderRegion, id, sourceMember, targetMember);
-        }
-      }
-    } finally {
-      if (stats != null) {
-        stats.endBucketTransfer(regionCount, result, totalBytes, elapsed);
-      }
-    }
-
-    return result;
-  }
-
-  @Override
-  public void createRedundantBucket(
-      final InternalDistributedMember targetMember, final int i, 
-      final Map<String, Long> colocatedRegionBytes, final Completion completion) {
-
-    if (stats != null) {
-      stats.startBucketCreate(regionCount);
-    }
-
-    final long start = System.nanoTime();
-    delegate.createRedundantBucket(targetMember, i, 
-        colocatedRegionBytes, new Completion() {
-
-      @Override
-      public void onSuccess() {
-        long totalBytes = 0;
-        long elapsed = System.nanoTime() - start;
-        if (logger.isDebugEnabled()) {
-          logger.debug("Rebalancing {} redundant bucket {} created on {}", leaderRegion, i, targetMember);
-        }
-        for (PartitionRebalanceDetailsImpl details : detailSet) {
-          String regionPath = details.getRegionPath();
-          Long lrb = colocatedRegionBytes.get(regionPath);
-          if (lrb != null) { // region could have gone away - esp during shutdow
-            long regionBytes = lrb.longValue();
-            // Only add the elapsed time to the leader region.
-            details.incCreates(regionBytes, details.getRegion().equals(leaderRegion) ? elapsed : 0);
-            totalBytes += regionBytes;
-          }
-        }
-
-        if (stats != null) {
-          stats.endBucketCreate(regionCount, true, totalBytes, elapsed);
-        }
-        
-        //invoke onSuccess on the received completion callback
-        completion.onSuccess();
-      }
-
-      @Override
-      public void onFailure() {
-        long elapsed = System.nanoTime() - start;
-
-        if (logger.isDebugEnabled()) {
-          logger.info("Rebalancing {} redundant bucket {} failed creation on {}", leaderRegion, i, targetMember);
-        }
-
-        if (stats != null) {
-          stats.endBucketCreate(regionCount, false, 0, elapsed);
-        }
-        
-        //invoke onFailure on the received completion callback
-        completion.onFailure();
-      }
-    });
-  }
-
-  @Override
-  public boolean removeBucket(
-      InternalDistributedMember targetMember, int i, 
-      Map<String, Long> colocatedRegionBytes) {
-    boolean result = false;
-    long elapsed = 0;
-    long totalBytes = 0;
-
-    if (stats != null) {
-      stats.startBucketRemove(regionCount);
-    }
-    try {
-      long start = System.nanoTime();
-      result = delegate.removeBucket(targetMember, i, colocatedRegionBytes);
-      elapsed = System.nanoTime() - start;
-      if (result) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Rebalancing {} redundant bucket {} removed from {}", leaderRegion, i, targetMember);
-        }
-        for (PartitionRebalanceDetailsImpl details : detailSet) {
-          String regionPath = details.getRegionPath();
-          Long lrb = colocatedRegionBytes.get(regionPath);
-          if (lrb != null) { // region could have gone away - esp during shutdow
-            long regionBytes = lrb.longValue();
-            // Only add the elapsed time to the leader region.
-            details.incRemoves(regionBytes, 
-                details.getRegion().equals(leaderRegion) ? elapsed : 0);
-            totalBytes += regionBytes;
-          }
-        }
-      } else {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Rebalancing {} redundant bucket {} failed removal o{}", leaderRegion, i, targetMember);
-        }
-      }
-    } finally {
-      if (stats != null) {
-        stats.endBucketRemove(regionCount, result, totalBytes, elapsed);
-      }
-    }
-
-    return result;
-  }
-
-  @Override
-  public boolean movePrimary(InternalDistributedMember source, 
-      InternalDistributedMember target, int bucketId) {
-    boolean result = false;
-    long elapsed = 0;
-
-    if (stats != null) {
-      stats.startPrimaryTransfer(regionCount);
-    }
-
-    try {
-      long start = System.nanoTime();
-      result = delegate.movePrimary(source, target, bucketId);
-      elapsed = System.nanoTime() - start;
-      if (result) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Rebalancing {} primary bucket {} moved from {} to {}", leaderRegion, bucketId, source, target);
-        }
-        for (PartitionRebalanceDetailsImpl details : detailSet) {
-          details.incPrimaryTransfers(details.getRegion().equals(leaderRegion) ? elapsed : 0);
-        }
-      } else {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Rebalancing {} primary bucket {} failed to move from {} to {}", leaderRegion, bucketId, source, target);
-        }
-      }
-    } finally {
-      if (stats != null) {
-        stats.endPrimaryTransfer(regionCount, result, elapsed);
-      }
-    }
-
-    return result;
-  }
-
-  @Override
-  public void waitForOperations() {
-    delegate.waitForOperations();
-  }
-
-  public Set<PartitionRebalanceDetailsImpl> getDetailSet() {
-    return this.detailSet;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c1d867d/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
index a5d2a03..26ebc16 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
@@ -16,13 +16,6 @@
  */
 package com.gemstone.gemfire.internal.cache.control;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -50,6 +43,7 @@ import com.gemstone.gemfire.cache.DiskStoreFactory;
 import com.gemstone.gemfire.cache.EntryEvent;
 import com.gemstone.gemfire.cache.EvictionAction;
 import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.GemFireCache;
 import com.gemstone.gemfire.cache.LoaderHelper;
 import com.gemstone.gemfire.cache.PartitionAttributes;
 import com.gemstone.gemfire.cache.PartitionAttributesFactory;
@@ -69,12 +63,11 @@ import com.gemstone.gemfire.cache30.CacheTestCase;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.DistributedSystem;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.internal.cache.BucketRegion;
 import com.gemstone.gemfire.internal.cache.ColocationHelper;
 import com.gemstone.gemfire.internal.cache.DiskStoreImpl;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.PRHARedundancyProvider;
+import com.gemstone.gemfire.internal.cache.InternalCache;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
 import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceObserverAdapter;
@@ -979,143 +972,6 @@ public class RebalanceOperationDUnitTest extends CacheTestCase {
     return (DistributedMember) vm.invoke(createPrRegion);
   }
   
-  public void testRecoverRedundancyBalancingIfCreateBucketFails() {
-    Host host = Host.getHost(0);
-    VM vm0 = host.getVM(0);
-    VM vm1 = host.getVM(1);
-    VM vm2 = host.getVM(2);
-    
-
-    final DistributedMember member1 = createPrRegion(vm0, "region1", 100, null);
-    
-    vm0.invoke(new SerializableRunnable("createSomeBuckets") {
-      
-      public void run() {
-        Cache cache = getCache();
-        Region region = cache.getRegion("region1");
-        for(int i = 0; i < 1; i++) {
-          region.put(Integer.valueOf(i), "A");
-        }
-      }
-    });
-    
-    
-    SerializableRunnable checkRedundancy= new SerializableRunnable("checkRedundancy") {
-
-      public void run() {
-        Cache cache = getCache();
-        Region region = cache.getRegion("region1");
-        PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
-        assertEquals(1, details.getCreatedBucketCount());
-        assertEquals(0,details.getActualRedundantCopies());
-        assertEquals(1,details.getLowRedundancyBucketCount());
-      }
-    };
-    
-    vm0.invoke(checkRedundancy);
-        
-    //Now create the region in 2 more VMs
-    //Let localMaxMemory(VM1) > localMaxMemory(VM2)
-    //so that redundant bucket will always be attempted on VM1
-    final DistributedMember member2 = createPrRegion(vm1, "region1", 100, null);
-    final DistributedMember member3 = createPrRegion(vm2, "region1", 90, null);
-    
-    vm0.invoke(checkRedundancy);
-    
-    //Inject mock PRHARedundancyProvider to simulate createBucketFailures
-    vm0.invoke(new SerializableRunnable("injectCreateBucketFailureAndRebalance") {
-      
-      @Override
-      public void run() {
-        GemFireCacheImpl cache = spy(getGemfireCache());
-        //set the spied cache instance
-        GemFireCacheImpl origCache = GemFireCacheImpl.setInstanceForTests(cache);
-        
-        PartitionedRegion origRegion = (PartitionedRegion) cache.getRegion("region1");
-        PartitionedRegion spyRegion = spy(origRegion);
-        PRHARedundancyProvider redundancyProvider = spy(new PRHARedundancyProvider(spyRegion));   
-
-        //return the spied region when ever getPartitionedRegions() is invoked
-        Set<PartitionedRegion> parRegions = cache.getPartitionedRegions();
-        parRegions.remove(origRegion);
-        parRegions.add(spyRegion);
-
-        doReturn(parRegions).when(cache).getPartitionedRegions();
-        doReturn(redundancyProvider).when(spyRegion).getRedundancyProvider();
-        
-        //simulate create bucket fails on member2 and test if it creates on member3
-        doReturn(false).when(redundancyProvider).createBackupBucketOnMember(anyInt(), eq((InternalDistributedMember) member2), anyBoolean(), anyBoolean(), any(), anyBoolean());
-
-        //Now simulate a rebalance
-        //Create operationImpl and not factory as we need spied cache to be passed to operationImpl
-        RegionFilter filter = new FilterByPath(null, null);
-        RebalanceOperationImpl operation = new RebalanceOperationImpl(cache, false, filter);
-        operation.start();
-        RebalanceResults results = null;
-        try {
-          results = operation.getResults(MAX_WAIT, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-          Assert.fail("Interrupted waiting on rebalance", e);
-        } catch (TimeoutException e) {
-          Assert.fail("Timeout waiting on rebalance", e);
-        }
-        assertEquals(1, results.getTotalBucketCreatesCompleted());
-        assertEquals(0, results.getTotalPrimaryTransfersCompleted());
-        assertEquals(0, results.getTotalBucketTransferBytes());
-        assertEquals(0, results.getTotalBucketTransfersCompleted());
-        Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
-        assertEquals(1, detailSet.size());
-        PartitionRebalanceInfo details = detailSet.iterator().next();
-        assertEquals(1, details.getBucketCreatesCompleted());
-        assertEquals(0, details.getPrimaryTransfersCompleted());
-        assertEquals(0, details.getBucketTransferBytes());
-        assertEquals(0, details.getBucketTransfersCompleted());
-        
-        Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
-        assertEquals(3, afterDetails.size());
-        for(PartitionMemberInfo memberDetails: afterDetails) {
-          if(memberDetails.getDistributedMember().equals(member1)) {
-            assertEquals(1, memberDetails.getBucketCount());
-            assertEquals(1, memberDetails.getPrimaryCount());
-          } else if(memberDetails.getDistributedMember().equals(member2)) {
-            assertEquals(0, memberDetails.getBucketCount());
-            assertEquals(0, memberDetails.getPrimaryCount());
-          } else if(memberDetails.getDistributedMember().equals(member3)) {
-            assertEquals(1, memberDetails.getBucketCount());
-            assertEquals(0, memberDetails.getPrimaryCount());
-          }
-        }
-        
-        ResourceManagerStats stats = cache.getResourceManager().getStats();
-        
-        assertEquals(0, stats.getRebalancesInProgress());
-        assertEquals(1, stats.getRebalancesCompleted());
-        assertEquals(0, stats.getRebalanceBucketCreatesInProgress());
-        assertEquals(results.getTotalBucketCreatesCompleted(), stats.getRebalanceBucketCreatesCompleted());
-        assertEquals(1, stats.getRebalanceBucketCreatesFailed());
-        
-        //set the original cache
-        GemFireCacheImpl.setInstanceForTests(origCache);
-      }
-    });
-    
-    SerializableRunnable checkRedundancyFixed = new SerializableRunnable("checkLowRedundancy") {
-
-      public void run() {
-        Cache cache = getCache();
-        Region region = cache.getRegion("region1");
-        PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
-        assertEquals(1, details.getCreatedBucketCount());
-        assertEquals(1,details.getActualRedundantCopies());
-        assertEquals(0,details.getLowRedundancyBucketCount());
-      }
-    };
-
-    vm0.invoke(checkRedundancyFixed);
-    vm1.invoke(checkRedundancyFixed);
-    vm2.invoke(checkRedundancyFixed);
-  }
-  
   public void testRecoverRedundancyColocatedRegionsSimulation() {
     recoverRedundancyColocatedRegions(true);
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c1d867d/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java
deleted file mode 100644
index a5c8982..0000000
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.partitioned.rebalance;
-
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
-import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionRebalanceOp;
-import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator.Completion;
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class BucketOperatorImplTest {
-
-  private InternalResourceManager.ResourceObserver resourceObserver;
-
-  private BucketOperatorImpl operator;
-
-  private PartitionedRegion region;
-  private PartitionedRegionRebalanceOp rebalanceOp;
-  private Completion completion;
-
-  private Map<String, Long> colocatedRegionBytes = new HashMap<String, Long>();
-  private int bucketId = 1;
-  private InternalDistributedMember sourceMember, targetMember;
-
-  @Before
-  public void setup() throws UnknownHostException {
-    region = mock(PartitionedRegion.class);
-    rebalanceOp = mock(PartitionedRegionRebalanceOp.class);
-    completion = mock(Completion.class);
-    
-    resourceObserver = spy(new InternalResourceManager.ResourceObserverAdapter());
-    InternalResourceManager.setResourceObserver(resourceObserver);
-    
-    doReturn(region).when(rebalanceOp).getLeaderRegion();
-
-    operator = new BucketOperatorImpl(rebalanceOp);
-
-    sourceMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
-    targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.2"), 1);
-  }
-
-  @After
-  public void after() {
-    reset(resourceObserver);
-  }
-
-  @Test
-  public void moveBucketShouldDelegateToParRegRebalanceOpMoveBucketForRegion() throws UnknownHostException {
-    doReturn(true).when(rebalanceOp).moveBucketForRegion(sourceMember, targetMember, bucketId);
-
-    operator.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
-
-    verify(resourceObserver, times(1)).movingBucket(region, bucketId, sourceMember, targetMember);
-    verify(rebalanceOp, times(1)).moveBucketForRegion(sourceMember, targetMember, bucketId);
-  }
-
-  @Test
-  public void movePrimaryShouldDelegateToParRegRebalanceOpMovePrimaryBucketForRegion() throws UnknownHostException {
-    doReturn(true).when(rebalanceOp).movePrimaryBucketForRegion(targetMember, bucketId);
-
-    operator.movePrimary(sourceMember, targetMember, bucketId);
-
-    verify(resourceObserver, times(1)).movingPrimary(region, bucketId, sourceMember, targetMember);
-    verify(rebalanceOp, times(1)).movePrimaryBucketForRegion(targetMember, bucketId);
-  }
-
-  @Test
-  public void createBucketShouldDelegateToParRegRebalanceOpCreateRedundantBucketForRegion() throws UnknownHostException {
-    doReturn(true).when(rebalanceOp).createRedundantBucketForRegion(targetMember, bucketId);
-
-    operator.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completion);
-
-    verify(rebalanceOp, times(1)).createRedundantBucketForRegion(targetMember, bucketId);
-  }
-
-  @Test
-  public void createBucketShouldInvokeOnSuccessIfCreateBucketSucceeds() {
-    doReturn(true).when(rebalanceOp).createRedundantBucketForRegion(targetMember, bucketId);
-
-    operator.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completion);
-
-    verify(rebalanceOp, times(1)).createRedundantBucketForRegion(targetMember, bucketId);
-    verify(completion, times(1)).onSuccess();
-  }
-
-  @Test
-  public void createBucketShouldInvokeOnFailureIfCreateBucketFails() {
-    doReturn(false).when(rebalanceOp).createRedundantBucketForRegion(targetMember, bucketId); //return false for create fail
-
-    operator.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completion);
-
-    verify(rebalanceOp, times(1)).createRedundantBucketForRegion(targetMember, bucketId);
-    verify(completion, times(1)).onFailure();
-  }
-
-  @Test
-  public void removeBucketShouldDelegateToParRegRebalanceOpRemoveRedundantBucketForRegion() {
-    doReturn(true).when(rebalanceOp).removeRedundantBucketForRegion(targetMember, bucketId);
-
-    operator.removeBucket(targetMember, bucketId, colocatedRegionBytes);
-
-    verify(rebalanceOp, times(1)).removeRedundantBucketForRegion(targetMember, bucketId);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7c1d867d/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
deleted file mode 100644
index 558062b..0000000
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.partitioned.rebalance;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.spy;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.control.PartitionRebalanceDetailsImpl;
-import com.gemstone.gemfire.internal.cache.control.ResourceManagerStats;
-import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator.Completion;
-import com.gemstone.gemfire.test.junit.categories.UnitTest;
-
-@Category(UnitTest.class)
-public class BucketOperatorWrapperTest {
-  
-  private ResourceManagerStats stats;
-  private PartitionedRegion leaderRegion;
-  private PartitionedRegion colocatedRegion;
-  private Set<PartitionRebalanceDetailsImpl> rebalanceDetails;
-  private BucketOperatorWrapper wrapper;
-  private BucketOperatorImpl delegate;
-  
-  private Map<String, Long> colocatedRegionBytes;
-  private int bucketId = 1;
-  private InternalDistributedMember sourceMember, targetMember;
-  
-  private final static String PR_LEADER_REGION_NAME = "leadregion1";
-  private final static String PR_COLOCATED_REGION_NAME = "coloregion1";
-  
-  @Before
-  public void setUp() throws UnknownHostException {
-    colocatedRegionBytes = new HashMap<String, Long>();
-    colocatedRegionBytes.put(PR_LEADER_REGION_NAME, 100L);
-    colocatedRegionBytes.put(PR_COLOCATED_REGION_NAME, 50L);
-    
-    sourceMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
-    targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.2"), 1);
-
-    stats = mock(ResourceManagerStats.class);
-    doNothing().when(stats).startBucketCreate(anyInt());
-    doNothing().when(stats).endBucketCreate(anyInt(), anyBoolean(), anyLong(), anyLong());
-    
-    leaderRegion = mock(PartitionedRegion.class);
-    doReturn(PR_LEADER_REGION_NAME).when(leaderRegion).getFullPath();
-    colocatedRegion = mock(PartitionedRegion.class);
-    doReturn(PR_COLOCATED_REGION_NAME).when(colocatedRegion).getFullPath();
-    
-    rebalanceDetails = new HashSet<PartitionRebalanceDetailsImpl>();
-    PartitionRebalanceDetailsImpl details = spy(new PartitionRebalanceDetailsImpl(leaderRegion));
-    rebalanceDetails.add(details);
-    
-    delegate = mock(BucketOperatorImpl.class);
-    
-    wrapper = new BucketOperatorWrapper(delegate, rebalanceDetails, stats, leaderRegion);
-  }
-  
-  @Test
-  public void bucketWrapperShouldDelegateCreateBucketToEnclosedOperator() {
-    Completion completionSentToWrapper = mock(Completion.class);
-
-    doNothing().when(delegate).createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
-
-    wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
-    
-    verify(delegate, times(1)).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
-  }
-  
-  @Test
-  public void bucketWrapperShouldRecordNumberOfBucketsCreatedIfCreateBucketSucceeds() {
-    doAnswer(new Answer<Object>() {
-      public Object answer(InvocationOnMock invocation) {
-        //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
-        ((Completion) invocation.getArguments()[3]).onSuccess();
-        return null;
-      }
-    }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
-    
-    Completion completionSentToWrapper = mock(Completion.class);
-    wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
-    
-    //verify create buckets is recorded
-    for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
-      if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
-        verify(details, times(1)).incCreates(eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME)), anyLong());
-      else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
-        verify(details, times(1)).incTransfers(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME), 0); //elapsed is recorded only if its leader
-    }
-  }
-  
-  @Test
-  public void bucketWrapperShouldNotRecordNumberOfBucketsCreatedIfCreateBucketFails() {
-    doAnswer(new Answer<Object>() {
-      public Object answer(InvocationOnMock invocation) {
-        //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
-        ((Completion) invocation.getArguments()[3]).onFailure();
-        return null;
-      }
-    }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
-    
-    Completion completionSentToWrapper = mock(Completion.class);
-    wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
-    
-    //verify create buckets is not recorded
-    for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
-      verify(details, times(0)).incTransfers(anyLong(), anyLong());
-    }
-  }
-  
-  @Test
-  public void bucketWrapperShouldInvokeOnFailureWhenCreateBucketFails() {        
-    doAnswer(new Answer<Object>() {
-      public Object answer(InvocationOnMock invocation) {
-        //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
-        ((Completion) invocation.getArguments()[3]).onFailure();
-        return null;
-      }
-    }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
-    
-    Completion completionSentToWrapper = mock(Completion.class);
-    wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
-    
-    //verify onFailure is invoked
-    verify(completionSentToWrapper, times(1)).onFailure();
-  }
-  
-  @Test
-  public void bucketWrapperShouldInvokeOnSuccessWhenCreateBucketSucceeds() {       
-    doAnswer(new Answer<Object>() {
-      public Object answer(InvocationOnMock invocation) {
-        //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
-        ((Completion) invocation.getArguments()[3]).onSuccess();
-        return null;
-      }
-    }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
-    
-    Completion completionSentToWrapper = mock(Completion.class);
-    wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
-    
-    verify(completionSentToWrapper, times(1)).onSuccess();
-  }
-  
-  @Test
-  public void bucketWrapperShouldDelegateMoveBucketToEnclosedOperator() {    
-    doReturn(true).when(delegate).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
-
-    wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
-    
-    //verify the delegate is invoked
-    verify(delegate, times(1)).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
-    
-    //verify we recorded necessary stats
-    verify(stats, times(1)).startBucketTransfer(anyInt());
-    verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong());
-  }
-  
-  @Test
-  public void bucketWrapperShouldRecordBytesTransferredPerRegionAfterMoveBucketIsSuccessful() {    
-    doReturn(true).when(delegate).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
-
-    wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
-    
-    //verify the details is updated with bytes transfered
-    for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
-      if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
-        verify(details, times(1)).incTransfers(eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME)), anyLong());
-      else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
-        verify(details, times(1)).incTransfers(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME), 0); //elapsed is recorded only if its leader
-    }
-    
-    //verify we recorded necessary stats
-    verify(stats, times(1)).startBucketTransfer(anyInt());
-    verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong());
-  }
-  
-  @Test
-  public void bucketWrapperShouldDoNotRecordBytesTransferedIfMoveBucketFails() {
-    doReturn(false).when(delegate).moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
-
-    wrapper.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
-    
-    //verify the details is not updated with bytes transfered
-    for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
-        verify(details, times(0)).incTransfers(anyLong(), anyLong());
-    }
-    
-    //verify we recorded necessary stats
-    verify(stats, times(1)).startBucketTransfer(anyInt());
-    verify(stats, times(1)).endBucketTransfer(anyInt(), anyBoolean(), anyLong(), anyLong());
-  }
-  
-  @Test
-  public void bucketWrapperShouldDelegateRemoveBucketToEnclosedOperator() {    
-    wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes);
-    
-    //verify the delegate is invoked
-    verify(delegate, times(1)).removeBucket(targetMember, bucketId, colocatedRegionBytes);
-    
-    //verify we recorded necessary stats
-    verify(stats, times(1)).startBucketRemove(anyInt());
-    verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong());
-  }
-  
-  @Test
-  public void bucketWrapperShouldRecordBucketRemovesPerRegionAfterRemoveBucketIsSuccessful() {    
-    doReturn(true).when(delegate).removeBucket(targetMember, bucketId, colocatedRegionBytes);
-
-    wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes);
-    
-    //verify the details is updated with bytes transfered
-    for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
-      if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
-        verify(details, times(1)).incRemoves((eq(colocatedRegionBytes.get(PR_LEADER_REGION_NAME))), anyLong());
-      else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
-        verify(details, times(1)).incRemoves(colocatedRegionBytes.get(PR_COLOCATED_REGION_NAME), 0); //elapsed is recorded only if its leader
-    }
-    
-    //verify we recorded necessary stats
-    verify(stats, times(1)).startBucketRemove(anyInt());
-    verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong());
-  }
-  
-  @Test
-  public void bucketWrapperShouldDoNotRecordBucketRemovesIfMoveBucketFails() {
-    doReturn(false).when(delegate).removeBucket(targetMember, bucketId, colocatedRegionBytes);
-
-    wrapper.removeBucket(targetMember, bucketId, colocatedRegionBytes);
-    
-    //verify the details is not updated with bytes transfered
-    for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
-        verify(details, times(0)).incTransfers(anyLong(), anyLong());
-    }
-    
-    //verify we recorded necessary stats
-    verify(stats, times(1)).startBucketRemove(anyInt());
-    verify(stats, times(1)).endBucketRemove(anyInt(), anyBoolean(), anyLong(), anyLong());
-  }
-  
-  @Test
-  public void bucketWrapperShouldDelegateMovePrimaryToEnclosedOperator() {    
-    wrapper.movePrimary(sourceMember, targetMember, bucketId);
-    
-    //verify the delegate is invoked
-    verify(delegate, times(1)).movePrimary(sourceMember, targetMember, bucketId);
-    
-    //verify we recorded necessary stats
-    verify(stats, times(1)).startPrimaryTransfer(anyInt());
-    verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong());
-  }
-  
-  @Test
-  public void bucketWrapperShouldRecordPrimaryTransfersPerRegionAfterMovePrimaryIsSuccessful() {    
-    doReturn(true).when(delegate).movePrimary(sourceMember, targetMember, bucketId);
-
-    wrapper.movePrimary(sourceMember, targetMember, bucketId);
-    
-    //verify the details is updated with bytes transfered
-    for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
-      if(details.getRegionPath().equalsIgnoreCase(PR_LEADER_REGION_NAME))
-        verify(details, times(1)).incPrimaryTransfers(anyLong());
-      else if(details.getRegionPath().equals(PR_COLOCATED_REGION_NAME))
-        verify(details, times(1)).incPrimaryTransfers(0); //elapsed is recorded only if its leader
-    }
-    
-    //verify we recorded necessary stats
-    verify(stats, times(1)).startPrimaryTransfer(anyInt());
-    verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong());
-  }
-  
-  @Test
-  public void bucketWrapperShouldNotRecordPrimaryTransfersPerRegionAfterMovePrimaryFails() {
-    doReturn(false).when(delegate).movePrimary(sourceMember, targetMember, bucketId);
-
-    wrapper.movePrimary(sourceMember, targetMember, bucketId);
-    
-    //verify the details is not updated with bytes transfered
-    for( PartitionRebalanceDetailsImpl details : rebalanceDetails) {
-        verify(details, times(0)).incTransfers(anyLong(), anyLong());
-    }
-    
-    //verify we recorded necessary stats
-    verify(stats, times(1)).startPrimaryTransfer(anyInt());
-    verify(stats, times(1)).endPrimaryTransfer(anyInt(), anyBoolean(), anyLong());
-  }
-}