You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by sa...@apache.org on 2016/04/01 03:19:56 UTC

incubator-geode git commit: Fixed BucketOperatorWrapper to invoke onFailure if bucket creation fails

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-1153 [created] b35af269c


Fixed BucketOperatorWrapper to invoke onFailure if bucket creation fails


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b35af269
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b35af269
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b35af269

Branch: refs/heads/feature/GEODE-1153
Commit: b35af269c1232a5f509cb9dfcfde1dd7ebe5b89c
Parents: 8503fc3
Author: Sai Boorlagadda <sb...@pivotal.io>
Authored: Thu Mar 31 15:37:25 2016 -0700
Committer: Sai Boorlagadda <sb...@pivotal.io>
Committed: Thu Mar 31 15:50:38 2016 -0700

----------------------------------------------------------------------
 .../PartitionedRegionRebalanceOp.java           | 264 +------------------
 .../rebalance/BucketOperatorImpl.java           |  69 +++++
 .../rebalance/BucketOperatorWrapper.java        | 209 +++++++++++++++
 .../control/RebalanceOperationDUnitTest.java    | 167 +++++++++++-
 .../rebalance/BucketOperatorImplTest.java       | 145 ++++++++++
 .../rebalance/BucketOperatorWrapperTest.java    |  97 +++++++
 6 files changed, 689 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b35af269/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
index 8642876..f36c74f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
@@ -47,6 +47,8 @@ import com.gemstone.gemfire.internal.cache.partitioned.BecomePrimaryBucketMessag
 import com.gemstone.gemfire.internal.cache.partitioned.MoveBucketMessage.MoveBucketResponse;
 import com.gemstone.gemfire.internal.cache.partitioned.RemoveBucketMessage.RemoveBucketResponse;
 import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator;
+import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorImpl;
+import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorWrapper;
 import com.gemstone.gemfire.internal.cache.partitioned.rebalance.ParallelBucketOperator;
 import com.gemstone.gemfire.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel;
 import com.gemstone.gemfire.internal.cache.partitioned.rebalance.PartitionedRegionLoadModel.AddressComparor;
@@ -413,9 +415,9 @@ public class PartitionedRegionRebalanceOp {
     }
     BucketOperator operator = simulate ? 
         new SimulatedBucketOperator() 
-        : new BucketOperatorImpl();
+        : new BucketOperatorImpl(leaderRegion, isRebalance, replaceOfflineData);
     BucketOperatorWrapper wrapper = new BucketOperatorWrapper(
-        operator, rebalanceDetails);
+        operator, rebalanceDetails, stats, leaderRegion);
     return wrapper;
   }
 
@@ -650,262 +652,4 @@ public class PartitionedRegionRebalanceOp {
     public void quorumLost(Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {
     }
   }
-  
-  private class BucketOperatorImpl implements BucketOperator {
-
-    @Override
-    public boolean moveBucket(InternalDistributedMember source,
-        InternalDistributedMember target, int bucketId,
-        Map<String, Long> colocatedRegionBytes) {
-
-      InternalResourceManager.getResourceObserver().movingBucket(
-          leaderRegion, bucketId, source, target);
-      return moveBucketForRegion(source, target, bucketId, leaderRegion);
-    }
-
-    @Override
-    public boolean movePrimary(InternalDistributedMember source,
-        InternalDistributedMember target, int bucketId) {
-
-      InternalResourceManager.getResourceObserver().movingPrimary(
-          leaderRegion, bucketId, source, target);
-      return movePrimaryBucketForRegion(target, bucketId, leaderRegion, isRebalance); 
-    }
-
-    @Override
-    public void createRedundantBucket(
-        InternalDistributedMember targetMember, int bucketId,
-        Map<String, Long> colocatedRegionBytes, Completion completion) {
-      boolean result = false;
-      try {
-        result = createRedundantBucketForRegion(targetMember, bucketId,
-          leaderRegion, isRebalance,replaceOfflineData);
-      } finally {
-        if(result) {
-          completion.onSuccess();
-        } else {
-          completion.onFailure();
-        }
-      }
-    }
-    
-    @Override
-    public void waitForOperations() {
-      //do nothing, all operations are synchronous
-    }
-
-    @Override
-    public boolean removeBucket(InternalDistributedMember targetMember, int bucketId,
-        Map<String, Long> colocatedRegionBytes) {
-      return removeRedundantBucketForRegion(targetMember, bucketId,
-          leaderRegion);
-    }
-  }
-
-  /**
-   * A wrapper class which delegates actual bucket operations to the enclosed BucketOperator,
-   * but keeps track of statistics about how many buckets are created, transfered, etc.
-   *
-   */
-  private class BucketOperatorWrapper implements 
-      BucketOperator {
-    private final BucketOperator delegate;
-    private final Set<PartitionRebalanceDetailsImpl> detailSet;
-    private final int regionCount;
-  
-    public BucketOperatorWrapper(
-        BucketOperator delegate,
-        Set<PartitionRebalanceDetailsImpl> rebalanceDetails) {
-      this.delegate = delegate;
-      this.detailSet = rebalanceDetails;
-      this.regionCount = detailSet.size();
-    }
-    @Override
-    public boolean moveBucket(InternalDistributedMember sourceMember,
-        InternalDistributedMember targetMember, int id,
-        Map<String, Long> colocatedRegionBytes) {
-      long start = System.nanoTime();
-      boolean result = false;
-      long elapsed = 0;
-      long totalBytes = 0;
-
-
-      if (stats != null) {
-        stats.startBucketTransfer(regionCount);
-      }
-      try {
-        result = delegate.moveBucket(sourceMember, targetMember, id,
-            colocatedRegionBytes);
-        elapsed = System.nanoTime() - start;
-        if (result) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Rebalancing {} bucket {} moved from {} to {}", leaderRegion, id, sourceMember, targetMember);
-          }
-          for (PartitionRebalanceDetailsImpl details : detailSet) {
-            String regionPath = details.getRegionPath();
-            Long regionBytes = colocatedRegionBytes.get(regionPath);
-            if(regionBytes != null) {
-            //only increment the elapsed time for the leader region
-              details.incTransfers(regionBytes.longValue(),
-                  details.getRegion().equals(leaderRegion) ? elapsed : 0);
-              totalBytes += regionBytes.longValue();
-            }
-          }
-        } else {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Rebalancing {} bucket {} moved failed from {} to {}", leaderRegion, id, sourceMember, targetMember);
-          }
-        }
-      } finally {
-        if(stats != null) {
-          stats.endBucketTransfer(regionCount, result, totalBytes, elapsed);
-        }
-      }
-      
-      return result;
-    }
-
-    @Override
-    public void createRedundantBucket(
-        final InternalDistributedMember targetMember, final int i, 
-        final Map<String, Long> colocatedRegionBytes, final Completion completion) {
-      
-      if(stats != null) {
-        stats.startBucketCreate(regionCount);
-      }
-      
-      final long start = System.nanoTime();
-      delegate.createRedundantBucket(targetMember, i,  
-          colocatedRegionBytes, new Completion() {
-
-        @Override
-        public void onSuccess() {
-          long totalBytes = 0;
-          long elapsed= System.nanoTime() - start;
-          if(logger.isDebugEnabled()) {
-            logger.debug("Rebalancing {} redundant bucket {} created on {}", leaderRegion, i, targetMember);
-          }
-          for (PartitionRebalanceDetailsImpl details : detailSet) {
-            String regionPath = details.getRegionPath();
-            Long lrb = colocatedRegionBytes.get(regionPath);
-            if (lrb != null) { // region could have gone away - esp during shutdow
-              long regionBytes = lrb.longValue();
-              //Only add the elapsed time to the leader region.
-              details.incCreates(regionBytes, 
-                  details.getRegion().equals(leaderRegion) ? elapsed : 0);
-              totalBytes += regionBytes;
-            }
-          }
-
-          if(stats != null) {
-            stats.endBucketCreate(regionCount, true, totalBytes, elapsed);
-          }
-
-        }
-
-        @Override
-        public void onFailure() {
-          long elapsed= System.nanoTime() - start;
-
-          if (logger.isDebugEnabled()) {
-            logger.debug("Rebalancing {} redundant bucket {} failed creation on {}", leaderRegion, i, targetMember);
-          }
-
-          if(stats != null) {
-            stats.endBucketCreate(regionCount, false, 0, elapsed);
-          }
-        }
-      });
-    }
-    
-    @Override
-    public boolean removeBucket(
-        InternalDistributedMember targetMember, int i, 
-        Map<String, Long> colocatedRegionBytes) {
-      boolean result = false;
-      long elapsed = 0;
-      long totalBytes = 0;
-      
-      
-      if(stats != null) {
-        stats.startBucketRemove(regionCount);
-      }
-      try {
-        long start = System.nanoTime();
-        result = delegate.removeBucket(targetMember, i,  
-            colocatedRegionBytes);
-        elapsed= System.nanoTime() - start;
-        if (result) {
-          if(logger.isDebugEnabled()) {
-            logger.debug("Rebalancing {} redundant bucket {} removed from {}", leaderRegion, i, targetMember);
-          }
-          for (PartitionRebalanceDetailsImpl details : detailSet) {
-            String regionPath = details.getRegionPath();
-            Long lrb = colocatedRegionBytes.get(regionPath);
-            if (lrb != null) { // region could have gone away - esp during shutdow
-              long regionBytes = lrb.longValue();
-              //Only add the elapsed time to the leader region.
-              details.incRemoves(regionBytes, 
-                  details.getRegion().equals(leaderRegion) ? elapsed : 0);
-              totalBytes += regionBytes;
-            }
-          }
-        } else {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Rebalancing {} redundant bucket {} failed removal o{}", leaderRegion, i, targetMember);
-          }
-        }
-      } finally {
-        if(stats != null) {
-          stats.endBucketRemove(regionCount, result, totalBytes, elapsed);
-        }
-      }
-      
-      return result;
-    }
-  
-    @Override
-    public boolean movePrimary(InternalDistributedMember source,
-        InternalDistributedMember target, int bucketId) {
-      boolean result = false;
-      long elapsed = 0;
-      
-      if(stats != null) {
-        stats.startPrimaryTransfer(regionCount);
-      }
-
-      try {
-        long start = System.nanoTime();
-        result = delegate.movePrimary(source, target, bucketId);
-        elapsed = System.nanoTime() - start;
-        if (result) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Rebalancing {} primary bucket {} moved from {} to {}", leaderRegion, bucketId, source, target);
-          }
-          for (PartitionRebalanceDetailsImpl details : detailSet) {
-            details.incPrimaryTransfers(details.getRegion().equals(leaderRegion) ? elapsed : 0);
-          }
-        } else {
-          if (logger.isDebugEnabled()) {
-            logger.debug("Rebalancing {} primary bucket {} failed to move from {} to {}", leaderRegion, bucketId, source, target);
-          }
-        }
-    } finally {
-      if(stats != null) {
-        stats.endPrimaryTransfer(regionCount, result, elapsed);
-      }
-    }
-      
-      return result;
-    }
-    
-    @Override
-    public void waitForOperations() {
-      delegate.waitForOperations();
-    }
-
-    public Set<PartitionRebalanceDetailsImpl> getDetailSet() {
-      return this.detailSet;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b35af269/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java
new file mode 100644
index 0000000..b7c172b
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImpl.java
@@ -0,0 +1,69 @@
+package com.gemstone.gemfire.internal.cache.partitioned.rebalance;
+
+import java.util.Map;
+
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
+import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionRebalanceOp;
+
+public class BucketOperatorImpl implements BucketOperator {
+  
+  private PartitionedRegion leaderRegion;
+  private boolean isRebalance;
+  private boolean replaceOfflineData;
+
+  public BucketOperatorImpl(PartitionedRegion leaderRegion, boolean isRebalance, boolean replaceOfflineData) {
+    this.leaderRegion = leaderRegion;
+    this.isRebalance = isRebalance;
+    this.replaceOfflineData = replaceOfflineData;
+  }
+
+  @Override
+  public boolean moveBucket(InternalDistributedMember source,
+      InternalDistributedMember target, int bucketId,
+      Map<String, Long> colocatedRegionBytes) {
+
+    InternalResourceManager.getResourceObserver().movingBucket(
+        leaderRegion, bucketId, source, target);
+    return PartitionedRegionRebalanceOp.moveBucketForRegion(source, target, bucketId, leaderRegion);
+  }
+
+  @Override
+  public boolean movePrimary(InternalDistributedMember source,
+      InternalDistributedMember target, int bucketId) {
+
+    InternalResourceManager.getResourceObserver().movingPrimary(
+        leaderRegion, bucketId, source, target);
+    return PartitionedRegionRebalanceOp.movePrimaryBucketForRegion(target, bucketId, leaderRegion, isRebalance); 
+  }
+
+  @Override
+  public void createRedundantBucket(
+      InternalDistributedMember targetMember, int bucketId,
+      Map<String, Long> colocatedRegionBytes, Completion completion) {
+    boolean result = false;
+    try {
+      result = PartitionedRegionRebalanceOp.createRedundantBucketForRegion(targetMember, bucketId,
+        leaderRegion, isRebalance, replaceOfflineData);
+    } finally {
+      if(result) {
+        completion.onSuccess();
+      } else {
+        completion.onFailure();
+      }
+    }
+  }
+  
+  @Override
+  public void waitForOperations() {
+    //do nothing, all operations are synchronous
+  }
+
+  @Override
+  public boolean removeBucket(InternalDistributedMember targetMember, int bucketId,
+      Map<String, Long> colocatedRegionBytes) {
+    return PartitionedRegionRebalanceOp.removeRedundantBucketForRegion(targetMember, bucketId,
+        leaderRegion);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b35af269/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
new file mode 100644
index 0000000..0d0edd2
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapper.java
@@ -0,0 +1,209 @@
+package com.gemstone.gemfire.internal.cache.partitioned.rebalance;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.control.PartitionRebalanceDetailsImpl;
+import com.gemstone.gemfire.internal.cache.control.ResourceManagerStats;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class BucketOperatorWrapper implements BucketOperator {
+  private static final Logger logger = LogService.getLogger();
+  
+  private final BucketOperator delegate;
+  private final Set<PartitionRebalanceDetailsImpl> detailSet;
+  private final int regionCount;
+  private final ResourceManagerStats stats;
+  private final PartitionedRegion leaderRegion;
+
+  public BucketOperatorWrapper(BucketOperator delegate, Set<PartitionRebalanceDetailsImpl> rebalanceDetails, ResourceManagerStats stats, PartitionedRegion leaderRegion) {
+    this.delegate = delegate;
+    this.detailSet = rebalanceDetails;
+    this.regionCount = detailSet.size();
+    this.stats = stats;
+    this.leaderRegion = leaderRegion;
+  }
+
+  @Override
+  public boolean moveBucket(InternalDistributedMember sourceMember, InternalDistributedMember targetMember, int id, Map<String, Long> colocatedRegionBytes) {
+    long start = System.nanoTime();
+    boolean result = false;
+    long elapsed = 0;
+    long totalBytes = 0;
+
+    if (stats != null) {
+      stats.startBucketTransfer(regionCount);
+    }
+    try {
+      result = delegate.moveBucket(sourceMember, targetMember, id, colocatedRegionBytes);
+      elapsed = System.nanoTime() - start;
+      if (result) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Rebalancing {} bucket {} moved from {} to {}", leaderRegion, id, sourceMember, targetMember);
+        }
+        for (PartitionRebalanceDetailsImpl details : detailSet) {
+          String regionPath = details.getRegionPath();
+          Long regionBytes = colocatedRegionBytes.get(regionPath);
+          if (regionBytes != null) {
+            // only increment the elapsed time for the leader region
+            details.incTransfers(regionBytes.longValue(), details.getRegion().equals(leaderRegion) ? elapsed : 0);
+            totalBytes += regionBytes.longValue();
+          }
+        }
+      } else {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Rebalancing {} bucket {} moved failed from {} to {}", leaderRegion, id, sourceMember, targetMember);
+        }
+      }
+    } finally {
+      if (stats != null) {
+        stats.endBucketTransfer(regionCount, result, totalBytes, elapsed);
+      }
+    }
+
+    return result;
+  }
+
+  @Override
+  public void createRedundantBucket(final InternalDistributedMember targetMember, final int i, final Map<String, Long> colocatedRegionBytes,
+      final Completion completion) {
+
+    if (stats != null) {
+      stats.startBucketCreate(regionCount);
+    }
+
+    final long start = System.nanoTime();
+    delegate.createRedundantBucket(targetMember, i, colocatedRegionBytes, new Completion() {
+
+      @Override
+      public void onSuccess() {
+        long totalBytes = 0;
+        long elapsed = System.nanoTime() - start;
+        if (logger.isDebugEnabled()) {
+          logger.debug("Rebalancing {} redundant bucket {} created on {}", leaderRegion, i, targetMember);
+        }
+        for (PartitionRebalanceDetailsImpl details : detailSet) {
+          String regionPath = details.getRegionPath();
+          Long lrb = colocatedRegionBytes.get(regionPath);
+          if (lrb != null) { // region could have gone away - esp during shutdow
+            long regionBytes = lrb.longValue();
+            // Only add the elapsed time to the leader region.
+            details.incCreates(regionBytes, details.getRegion().equals(leaderRegion) ? elapsed : 0);
+            totalBytes += regionBytes;
+          }
+        }
+
+        if (stats != null) {
+          stats.endBucketCreate(regionCount, true, totalBytes, elapsed);
+        }
+        
+        //invoke onSuccess on the received completion callback
+        completion.onSuccess();
+      }
+
+      @Override
+      public void onFailure() {
+        long elapsed = System.nanoTime() - start;
+
+        //if (logger.isDebugEnabled()) {
+          logger.info("Rebalancing {} redundant bucket {} failed creation on {}", leaderRegion, i, targetMember);
+        //}
+
+        if (stats != null) {
+          stats.endBucketCreate(regionCount, false, 0, elapsed);
+        }
+        
+        //invoke onFailure on the received completion callback
+        completion.onFailure();
+      }
+    });
+  }
+
+  @Override
+  public boolean removeBucket(InternalDistributedMember targetMember, int i, Map<String, Long> colocatedRegionBytes) {
+    boolean result = false;
+    long elapsed = 0;
+    long totalBytes = 0;
+
+    if (stats != null) {
+      stats.startBucketRemove(regionCount);
+    }
+    try {
+      long start = System.nanoTime();
+      result = delegate.removeBucket(targetMember, i, colocatedRegionBytes);
+      elapsed = System.nanoTime() - start;
+      if (result) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Rebalancing {} redundant bucket {} removed from {}", leaderRegion, i, targetMember);
+        }
+        for (PartitionRebalanceDetailsImpl details : detailSet) {
+          String regionPath = details.getRegionPath();
+          Long lrb = colocatedRegionBytes.get(regionPath);
+          if (lrb != null) { // region could have gone away - esp during shutdow
+            long regionBytes = lrb.longValue();
+            // Only add the elapsed time to the leader region.
+            details.incRemoves(regionBytes, details.getRegion().equals(leaderRegion) ? elapsed : 0);
+            totalBytes += regionBytes;
+          }
+        }
+      } else {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Rebalancing {} redundant bucket {} failed removal o{}", leaderRegion, i, targetMember);
+        }
+      }
+    } finally {
+      if (stats != null) {
+        stats.endBucketRemove(regionCount, result, totalBytes, elapsed);
+      }
+    }
+
+    return result;
+  }
+
+  @Override
+  public boolean movePrimary(InternalDistributedMember source, InternalDistributedMember target, int bucketId) {
+    boolean result = false;
+    long elapsed = 0;
+
+    if (stats != null) {
+      stats.startPrimaryTransfer(regionCount);
+    }
+
+    try {
+      long start = System.nanoTime();
+      result = delegate.movePrimary(source, target, bucketId);
+      elapsed = System.nanoTime() - start;
+      if (result) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Rebalancing {} primary bucket {} moved from {} to {}", leaderRegion, bucketId, source, target);
+        }
+        for (PartitionRebalanceDetailsImpl details : detailSet) {
+          details.incPrimaryTransfers(details.getRegion().equals(leaderRegion) ? elapsed : 0);
+        }
+      } else {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Rebalancing {} primary bucket {} failed to move from {} to {}", leaderRegion, bucketId, source, target);
+        }
+      }
+    } finally {
+      if (stats != null) {
+        stats.endPrimaryTransfer(regionCount, result, elapsed);
+      }
+    }
+
+    return result;
+  }
+
+  @Override
+  public void waitForOperations() {
+    delegate.waitForOperations();
+  }
+
+  public Set<PartitionRebalanceDetailsImpl> getDetailSet() {
+    return this.detailSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b35af269/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
index 26ebc16..4915037 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
@@ -16,6 +16,13 @@
  */
 package com.gemstone.gemfire.internal.cache.control;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -43,7 +50,6 @@ import com.gemstone.gemfire.cache.DiskStoreFactory;
 import com.gemstone.gemfire.cache.EntryEvent;
 import com.gemstone.gemfire.cache.EvictionAction;
 import com.gemstone.gemfire.cache.EvictionAttributes;
-import com.gemstone.gemfire.cache.GemFireCache;
 import com.gemstone.gemfire.cache.LoaderHelper;
 import com.gemstone.gemfire.cache.PartitionAttributes;
 import com.gemstone.gemfire.cache.PartitionAttributesFactory;
@@ -63,11 +69,12 @@ import com.gemstone.gemfire.cache30.CacheTestCase;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.DistributedSystem;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 import com.gemstone.gemfire.internal.cache.BucketRegion;
 import com.gemstone.gemfire.internal.cache.ColocationHelper;
 import com.gemstone.gemfire.internal.cache.DiskStoreImpl;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.InternalCache;
+import com.gemstone.gemfire.internal.cache.PRHARedundancyProvider;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
 import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceObserverAdapter;
@@ -972,6 +979,162 @@ public class RebalanceOperationDUnitTest extends CacheTestCase {
     return (DistributedMember) vm.invoke(createPrRegion);
   }
   
+  public void testRecoverRedundancyBalancingIfCreateBucketFails() {
+    recoverRedundancyBalancingIfCreateBucketFails(false);
+  }
+  
+  public void recoverRedundancyBalancingIfCreateBucketFails(boolean simulate) {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    
+
+    final DistributedMember member1 = createPrRegion(vm0, "region1", 100, null);
+    
+    vm0.invoke(new SerializableRunnable("createSomeBuckets") {
+      
+      public void run() {
+        Cache cache = getCache();
+        Region region = cache.getRegion("region1");
+        for(int i = 0; i < 1; i++) {
+          region.put(Integer.valueOf(i), "A");
+        }
+      }
+    });
+    
+    
+    SerializableRunnable checkRedundancy= new SerializableRunnable("checkRedundancy") {
+
+      public void run() {
+        Cache cache = getCache();
+        Region region = cache.getRegion("region1");
+        PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
+        assertEquals(1, details.getCreatedBucketCount());
+        assertEquals(0,details.getActualRedundantCopies());
+        assertEquals(1,details.getLowRedundancyBucketCount());
+      }
+    };
+    
+    vm0.invoke(checkRedundancy);
+        
+    //Now create the region in 2 more VMs
+    final DistributedMember member2 = createPrRegion(vm1, "region1", 100, null);
+    final DistributedMember member3 = createPrRegion(vm2, "region1", 100, null);
+    
+    vm0.invoke(checkRedundancy);
+    
+    //Inject mock PRHARedundancyProvider to simulate createBucketFailures
+    vm0.invoke(new SerializableRunnable("inject failure") {
+      
+      @Override
+      public void run() {
+        GemFireCacheImpl cache = spy(getGemfireCache());
+        //set the spied cache instance
+        GemFireCacheImpl.setInstanceForTests(cache);
+
+        PartitionedRegion realRegion = (PartitionedRegion) cache.getRegion("region1");
+        PartitionedRegion spyRegion = spy(realRegion);
+        PRHARedundancyProvider redundancyProvider = spy(new PRHARedundancyProvider(spyRegion));   
+
+        Set<PartitionedRegion> parRegions = cache.getPartitionedRegions();
+        parRegions.remove(realRegion);
+        parRegions.add(spyRegion);
+
+        doReturn(spyRegion).when(cache).getRegion("region1");
+        doReturn(parRegions).when(cache).getPartitionedRegions();
+                
+        doReturn(redundancyProvider).when(spyRegion).getRedundancyProvider();
+        
+        //simulate create bucket fails on member2
+        doReturn(false).when(redundancyProvider).createBackupBucketOnMember(anyInt(), eq((InternalDistributedMember) member2), anyBoolean(), anyBoolean(), any(), anyBoolean());
+        
+      }
+    });
+        
+    //Now simulate a rebalance
+    vm0.invoke(new SerializableRunnable("rebalance") {
+      
+      public void run() {
+        //Retrieve spied cache from GemFireCacheImpl and not using getCache()
+        GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+        RegionFilter filter = new FilterByPath(null, null);
+        RebalanceOperationImpl factory = new RebalanceOperationImpl(cache, false, filter);
+        factory.start();
+        RebalanceResults results = null;
+        try {
+          results = factory.getResults(MAX_WAIT, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          Assert.fail("Interrupted waiting on rebalance", e);
+        } catch (TimeoutException e) {
+          Assert.fail("Timeout waiting on rebalance", e);
+        }
+        assertEquals(1, results.getTotalBucketCreatesCompleted());
+        assertEquals(0, results.getTotalPrimaryTransfersCompleted());
+        assertEquals(0, results.getTotalBucketTransferBytes());
+        assertEquals(0, results.getTotalBucketTransfersCompleted());
+        Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
+        assertEquals(1, detailSet.size());
+        PartitionRebalanceInfo details = detailSet.iterator().next();
+        assertEquals(1, details.getBucketCreatesCompleted());
+        assertEquals(0, details.getPrimaryTransfersCompleted());
+        assertEquals(0, details.getBucketTransferBytes());
+        assertEquals(0, details.getBucketTransfersCompleted());
+        
+        Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
+        assertEquals(3, afterDetails.size());
+        for(PartitionMemberInfo memberDetails: afterDetails) {
+          if(memberDetails.getDistributedMember().equals(member1)) {
+            assertEquals(1, memberDetails.getBucketCount());
+            assertEquals(1, memberDetails.getPrimaryCount());
+          } else if(memberDetails.getDistributedMember().equals(member2)) {
+            assertEquals(0, memberDetails.getBucketCount());
+            assertEquals(0, memberDetails.getPrimaryCount());
+          } else if(memberDetails.getDistributedMember().equals(member3)) {
+            assertEquals(1, memberDetails.getBucketCount());
+            assertEquals(0, memberDetails.getPrimaryCount());
+          }
+        }
+        
+        if(!simulate) {
+          ResourceManagerStats stats = cache.getResourceManager().getStats();
+          
+          assertEquals(0, stats.getRebalancesInProgress());
+          assertEquals(1, stats.getRebalancesCompleted());
+          assertEquals(0, stats.getRebalanceBucketCreatesInProgress());
+          assertEquals(results.getTotalBucketCreatesCompleted(), stats.getRebalanceBucketCreatesCompleted());
+          assertEquals(1, stats.getRebalanceBucketCreatesFailed());
+        }
+      }
+    });
+    
+    if(!simulate) {
+      SerializableRunnable checkRedundancyFixed = new SerializableRunnable("checkLowRedundancy") {
+
+        public void run() {
+          Cache cache = getCache();
+          Region region = cache.getRegion("region1");
+          PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
+          assertEquals(1, details.getCreatedBucketCount());
+          assertEquals(1,details.getActualRedundantCopies());
+          assertEquals(0,details.getLowRedundancyBucketCount());
+        }
+      };
+
+      vm0.invoke(checkRedundancyFixed);
+      vm1.invoke(checkRedundancyFixed);
+      vm2.invoke(checkRedundancyFixed);
+    }
+    
+    //reset the cache
+    vm0.invoke(new SerializableRunnable() {   
+      @Override
+      public void run() throws Exception {
+        GemFireCacheImpl.setInstanceForTests(null);
+      }
+    });
+  }
+  
   public void testRecoverRedundancyColocatedRegionsSimulation() {
     recoverRedundancyColocatedRegions(true);
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b35af269/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java
new file mode 100644
index 0000000..d3643da
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorImplTest.java
@@ -0,0 +1,145 @@
+package com.gemstone.gemfire.internal.cache.partitioned.rebalance;
+
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
+import com.gemstone.gemfire.internal.cache.partitioned.PartitionedRegionRebalanceOp;
+import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator.Completion;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("*.UnitTest")
+@PrepareForTest({ InternalResourceManager.class, PartitionedRegionRebalanceOp.class })
+public class BucketOperatorImplTest {
+
+  private InternalResourceManager.ResourceObserver resourceObserver;
+
+  private BucketOperatorImpl operator;
+
+  @Mock
+  private PartitionedRegion region;
+  private boolean isRebalance = true;
+  private boolean replaceOfflineData = true;
+
+  private Map<String, Long> colocatedRegionBytes = new HashMap<String, Long>();
+  private int bucketId = 1;
+  private InternalDistributedMember sourceMember, targetMember;
+  @Mock
+  Completion completion;
+
+  @Before
+  public void setup() throws UnknownHostException {
+    resourceObserver = spy(new InternalResourceManager.ResourceObserverAdapter());
+
+    PowerMockito.mockStatic(InternalResourceManager.class);
+    when(InternalResourceManager.getResourceObserver()).thenReturn(resourceObserver);
+
+    operator = new BucketOperatorImpl(region, isRebalance, replaceOfflineData);
+
+    sourceMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
+    targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.2"), 1);
+  }
+
+  @After
+  public void after() {
+    reset(resourceObserver);
+  }
+
+  @Test
+  public void moveBucketShouldDelegateToParRegRebalanceOpMoveBucketForRegion() throws UnknownHostException {
+    PowerMockito.mockStatic(PartitionedRegionRebalanceOp.class);
+    when(PartitionedRegionRebalanceOp.moveBucketForRegion(sourceMember, targetMember, bucketId, region)).thenReturn(true);
+
+    operator.moveBucket(sourceMember, targetMember, bucketId, colocatedRegionBytes);
+
+    verify(resourceObserver, times(1)).movingBucket(region, bucketId, sourceMember, targetMember);
+
+    PowerMockito.verifyStatic(times(1));
+    PartitionedRegionRebalanceOp.moveBucketForRegion(sourceMember, targetMember, bucketId, region);
+  }
+
+  @Test
+  public void movePrimaryShouldDelegateToParRegRebalanceOpMovePrimaryBucketForRegion() throws UnknownHostException {
+    PowerMockito.mockStatic(PartitionedRegionRebalanceOp.class);
+    when(PartitionedRegionRebalanceOp.movePrimaryBucketForRegion(targetMember, bucketId, region, isRebalance)).thenReturn(true);
+
+    operator.movePrimary(sourceMember, targetMember, bucketId);
+
+    verify(resourceObserver, times(1)).movingPrimary(region, bucketId, sourceMember, targetMember);
+
+    PowerMockito.verifyStatic(times(1));
+    PartitionedRegionRebalanceOp.movePrimaryBucketForRegion(targetMember, bucketId, region, isRebalance);
+  }
+
+  @Test
+  public void createBucketShouldDelegateToParRegRebalanceOpCreateRedundantBucketForRegion() throws UnknownHostException {
+    PowerMockito.mockStatic(PartitionedRegionRebalanceOp.class);
+    when(PartitionedRegionRebalanceOp.createRedundantBucketForRegion(targetMember, bucketId, region, isRebalance, replaceOfflineData)).thenReturn(true);
+
+    operator.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completion);
+
+    PowerMockito.verifyStatic(times(1));
+    PartitionedRegionRebalanceOp.createRedundantBucketForRegion(targetMember, bucketId, region, isRebalance, replaceOfflineData);
+  }
+
+  @Test
+  public void createBucketShouldInvokeOnSuccessIfCreateBucketSucceeds() {
+    PowerMockito.mockStatic(PartitionedRegionRebalanceOp.class);
+    when(PartitionedRegionRebalanceOp.createRedundantBucketForRegion(targetMember, bucketId, region, isRebalance, replaceOfflineData)).thenReturn(true);
+
+    operator.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completion);
+
+    PowerMockito.verifyStatic(times(1));
+    PartitionedRegionRebalanceOp.createRedundantBucketForRegion(targetMember, bucketId, region, isRebalance, replaceOfflineData);
+
+    verify(completion, times(1)).onSuccess();
+  }
+
+  @Test
+  public void createBucketShouldInvokeOnFailureIfCreateBucketFails() {
+    PowerMockito.mockStatic(PartitionedRegionRebalanceOp.class);
+    when(PartitionedRegionRebalanceOp.createRedundantBucketForRegion(targetMember, bucketId, region, isRebalance, replaceOfflineData)).thenReturn(false); //return false for create fail
+
+    operator.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completion);
+
+    PowerMockito.verifyStatic(times(1));
+    PartitionedRegionRebalanceOp.createRedundantBucketForRegion(targetMember, bucketId, region, isRebalance, replaceOfflineData);
+
+    verify(completion, times(1)).onFailure();
+  }
+
+  @Test
+  public void removeBucketShouldDelegateToParRegRebalanceOpRemoveRedundantBucketForRegion() {
+    PowerMockito.mockStatic(PartitionedRegionRebalanceOp.class);
+    when(PartitionedRegionRebalanceOp.removeRedundantBucketForRegion(targetMember, bucketId, region)).thenReturn(true);
+
+    operator.removeBucket(targetMember, bucketId, colocatedRegionBytes);
+
+    PowerMockito.verifyStatic(times(1));
+    PartitionedRegionRebalanceOp.removeRedundantBucketForRegion(targetMember, bucketId, region);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b35af269/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
new file mode 100644
index 0000000..3678430
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/partitioned/rebalance/BucketOperatorWrapperTest.java
@@ -0,0 +1,97 @@
+package com.gemstone.gemfire.internal.cache.partitioned.rebalance;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.control.PartitionRebalanceDetailsImpl;
+import com.gemstone.gemfire.internal.cache.control.ResourceManagerStats;
+import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperator.Completion;
+import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorImpl;
+import com.gemstone.gemfire.internal.cache.partitioned.rebalance.BucketOperatorWrapper;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class BucketOperatorWrapperTest {
+  
+  public BucketOperatorWrapper createBucketOperatorWrapper(BucketOperatorImpl delegate) {
+    
+    ResourceManagerStats stats = mock(ResourceManagerStats.class);
+    doNothing().when(stats).startBucketCreate(anyInt());
+    doNothing().when(stats).endBucketCreate(anyInt(), anyBoolean(), anyLong(), anyLong());
+    
+    Set<PartitionRebalanceDetailsImpl> rebalanceDetails = new HashSet<PartitionRebalanceDetailsImpl>();
+    PartitionedRegion region = mock(PartitionedRegion.class);
+    
+    BucketOperatorWrapper wrapper = new BucketOperatorWrapper(delegate, rebalanceDetails, stats, region);
+    
+    return wrapper;
+  }
+  
+  @Test
+  public void bucketWrapperShouldInvokeOnFailureWhenCreateBucketFails() throws UnknownHostException {
+    BucketOperatorImpl delegate = mock(BucketOperatorImpl.class);   
+    BucketOperatorWrapper wrapper = createBucketOperatorWrapper(delegate);
+        
+    Map<String, Long> colocatedRegionBytes = new HashMap<String, Long>();
+    int bucketId = 1;
+    InternalDistributedMember targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
+        
+    doAnswer(new Answer<Object>() {
+      public Object answer(InvocationOnMock invocation) {
+        //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
+        ((Completion) invocation.getArguments()[3]).onFailure();
+        return null;
+      }
+    }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
+    
+    Completion completionSentToWrapper = mock(Completion.class);
+    wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
+    
+    verify(completionSentToWrapper, times(1)).onFailure();
+  }
+  
+  @Test
+  public void bucketWrapperShouldInvokeOnSuccessWhenCreateBucketFails() throws UnknownHostException {   
+    BucketOperatorImpl delegate = mock(BucketOperatorImpl.class);   
+    BucketOperatorWrapper wrapper =  createBucketOperatorWrapper(delegate);
+    
+    Map<String, Long> colocatedRegionBytes = new HashMap<String, Long>();
+    int bucketId = 1;
+    InternalDistributedMember targetMember = new InternalDistributedMember(InetAddress.getByName("127.0.0.1"), 1);
+        
+    doAnswer(new Answer<Object>() {
+      public Object answer(InvocationOnMock invocation) {
+        //3rd argument is Completion object sent to BucketOperatorImpl.createRedundantBucket
+        ((Completion) invocation.getArguments()[3]).onSuccess();
+        return null;
+      }
+    }).when(delegate).createRedundantBucket(eq(targetMember), eq(bucketId), eq(colocatedRegionBytes), any(Completion.class));
+    
+    Completion completionSentToWrapper = mock(Completion.class);
+    wrapper.createRedundantBucket(targetMember, bucketId, colocatedRegionBytes, completionSentToWrapper);
+    
+    verify(completionSentToWrapper, times(1)).onSuccess();
+  }
+}