You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by as...@apache.org on 2015/10/22 23:46:20 UTC

incubator-geode git commit: GEODE-464: Fix Auto-Rebalancer test race condition

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-409 37f77a90a -> b49025701


GEODE-464: Fix Auto-Rebalancer test race condition

Remove dependency on static instance of Cache in AutoRebalacer. Also as
identified in f801d1c, the tests and constructors needed to be refactored.
This allows easier injection of dependencies and determinstic test execution


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b4902570
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b4902570
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b4902570

Branch: refs/heads/feature/GEODE-409
Commit: b49025701685008a5c73b3c01b864da01ea3195d
Parents: 37f77a9
Author: Ashvin Agrawal <as...@apache.org>
Authored: Thu Oct 22 14:07:40 2015 -0700
Committer: Ashvin Agrawal <as...@apache.org>
Committed: Thu Oct 22 14:39:44 2015 -0700

----------------------------------------------------------------------
 .../gemfire/cache/util/AutoBalancer.java        | 158 ++++++++-------
 ...erAuditorInvocationIntegrationJUnitTest.java |  80 --------
 .../util/AutoBalancerIntegrationJUnitTest.java  | 180 ++++-------------
 .../cache/util/AutoBalancerJUnitTest.java       | 202 +++++++++++--------
 4 files changed, 242 insertions(+), 378 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b4902570/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
----------------------------------------------------------------------
diff --git a/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java b/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
index 00ebc5f..bcc5608 100644
--- a/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
+++ b/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
@@ -17,6 +17,7 @@ import org.quartz.CronExpression;
 import org.springframework.scheduling.support.CronSequenceGenerator;
 
 import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.cache.CacheClosedException;
 import com.gemstone.gemfire.cache.Declarable;
 import com.gemstone.gemfire.cache.GemFireCache;
 import com.gemstone.gemfire.cache.control.RebalanceOperation;
@@ -51,7 +52,7 @@ import com.gemstone.gemfire.internal.logging.LogService;
  * <P>
  * {@link AutoBalancer} can be controlled using the following configurations
  * <OL>
- * <LI> {@link AutoBalancer#SCHEDULE}
+ * <LI>{@link AutoBalancer#SCHEDULE}
  * <LI>TBD THRESHOLDS
  * 
  * @author Ashvin Agrawal
@@ -119,14 +120,25 @@ public class AutoBalancer implements Declarable {
 
   public static final Object AUTO_BALANCER_LOCK = "__AUTO_B_LOCK";
 
-  private AuditScheduler scheduler = new CronScheduler();
-  private OOBAuditor auditor = new SizeBasedOOBAuditor();
-  private TimeProvider clock = new SystemClockTimeProvider();
-  private CacheOperationFacade cacheFacade = new GeodeCacheFacade();
-  private AtomicBoolean isLockAcquired = new AtomicBoolean(false);
+  private final AuditScheduler scheduler;
+  private final OOBAuditor auditor;
+  private final TimeProvider clock;
+  private final CacheOperationFacade cacheFacade;
 
   private static final Logger logger = LogService.getLogger();
 
+  public AutoBalancer() {
+    this(null, null, null, null);
+  }
+
+  public AutoBalancer(AuditScheduler scheduler, OOBAuditor auditor, TimeProvider clock,
+      CacheOperationFacade cacheFacade) {
+    this.cacheFacade = cacheFacade == null ? new GeodeCacheFacade() : cacheFacade;
+    this.scheduler = scheduler == null ? new CronScheduler() : scheduler;
+    this.auditor = auditor == null ? new SizeBasedOOBAuditor(this.cacheFacade) : auditor;
+    this.clock = clock == null ? new SystemClockTimeProvider() : clock;
+  }
+
   @Override
   public void init(Properties props) {
     if (logger.isDebugEnabled()) {
@@ -215,10 +227,16 @@ public class AutoBalancer implements Declarable {
    * <LI>updates auto-balance stat
    * <LI>release lock
    */
-  class SizeBasedOOBAuditor implements OOBAuditor {
+  static class SizeBasedOOBAuditor implements OOBAuditor {
     private int sizeThreshold = DEFAULT_SIZE_THRESHOLD_PERCENT;
     private int sizeMinimum = DEFAULT_MINIMUM_SIZE;
 
+    final CacheOperationFacade cache;
+
+    public SizeBasedOOBAuditor(CacheOperationFacade cache) {
+      this.cache = cache;
+    }
+
     @Override
     public void init(Properties props) {
       if (logger.isDebugEnabled()) {
@@ -243,24 +261,16 @@ public class AutoBalancer implements Declarable {
 
     @Override
     public void execute() {
-      if (!isLockAcquired.get()) {
-        synchronized (isLockAcquired) {
-          if (!isLockAcquired.get()) {
-            boolean result = cacheFacade.acquireAutoBalanceLock();
-            if (result) {
-              isLockAcquired.set(true);
-            } else {
-              if (logger.isDebugEnabled()) {
-                logger.debug("Another member owns auto-balance lock. Skip this attempt to rebalance the cluster");
-              }
-              return;
-            }
-          }
+      boolean result = cache.acquireAutoBalanceLock();
+      if (!result) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Another member owns auto-balance lock. Skip this attempt to rebalance the cluster");
         }
+        return;
       }
 
-      cacheFacade.incrementAttemptCounter();
-      boolean result = needsRebalancing();
+      cache.incrementAttemptCounter();
+      result = needsRebalancing();
       if (!result) {
         if (logger.isDebugEnabled()) {
           logger.debug("Rebalancing is not needed");
@@ -268,7 +278,7 @@ public class AutoBalancer implements Declarable {
         return;
       }
 
-      cacheFacade.rebalance();
+      cache.rebalance();
     }
 
     /**
@@ -284,13 +294,13 @@ public class AutoBalancer implements Declarable {
      */
     boolean needsRebalancing() {
       // test cluster level status
-      long transferSize = cacheFacade.getTotalTransferSize();
+      long transferSize = cache.getTotalTransferSize();
       if (transferSize <= sizeMinimum) {
         return false;
       }
 
-      Map<PartitionedRegion, InternalPRInfo> details = cacheFacade.getRegionMemberDetails();
-      long totalSize = cacheFacade.getTotalDataSize(details);
+      Map<PartitionedRegion, InternalPRInfo> details = cache.getRegionMemberDetails();
+      long totalSize = cache.getTotalDataSize(details);
 
       if (totalSize > 0) {
         int transferPercent = (int) ((100.0 * transferSize) / totalSize);
@@ -318,6 +328,18 @@ public class AutoBalancer implements Declarable {
    * auto-balancing
    */
   static class GeodeCacheFacade implements CacheOperationFacade {
+    private final AtomicBoolean isLockAcquired = new AtomicBoolean(false);
+
+    private GemFireCacheImpl cache;
+
+    public GeodeCacheFacade() {
+      this(null);
+    }
+
+    public GeodeCacheFacade(GemFireCacheImpl cache) {
+      this.cache = cache;
+    }
+
     @Override
     public Map<PartitionedRegion, InternalPRInfo> getRegionMemberDetails() {
       GemFireCacheImpl cache = getCache();
@@ -354,12 +376,12 @@ public class AutoBalancer implements Declarable {
         RebalanceOperation operation = getCache().getResourceManager().createRebalanceFactory().simulate();
         RebalanceResults result = operation.getResults();
         if (logger.isDebugEnabled()) {
-          logger.debug("Rebalance estimate: RebalanceResultsImpl [TotalBucketCreateBytes="
-              + result.getTotalBucketCreateBytes() + ", TotalBucketCreatesCompleted="
-              + result.getTotalBucketCreatesCompleted() + ", TotalBucketTransferBytes="
-              + result.getTotalBucketTransferBytes() + ", TotalBucketTransfersCompleted="
-              + result.getTotalBucketTransfersCompleted() + ", TotalPrimaryTransfersCompleted="
-              + result.getTotalPrimaryTransfersCompleted() + "]");
+          logger.debug(
+              "Rebalance estimate: RebalanceResultsImpl [TotalBucketCreateBytes=" + result.getTotalBucketCreateBytes()
+                  + ", TotalBucketCreatesCompleted=" + result.getTotalBucketCreatesCompleted()
+                  + ", TotalBucketTransferBytes=" + result.getTotalBucketTransferBytes()
+                  + ", TotalBucketTransfersCompleted=" + result.getTotalBucketTransfersCompleted()
+                  + ", TotalPrimaryTransfersCompleted=" + result.getTotalPrimaryTransfersCompleted() + "]");
         }
         return result.getTotalBucketTransferBytes();
       } catch (CancellationException e) {
@@ -390,9 +412,8 @@ public class AutoBalancer implements Declarable {
             + result.getTotalBucketCreatesCompleted() + ", TotalBucketTransferBytes="
             + result.getTotalBucketTransferBytes() + ", TotalBucketTransferTime=" + result.getTotalBucketTransferTime()
             + ", TotalBucketTransfersCompleted=" + +result.getTotalBucketTransfersCompleted()
-            + ", TotalPrimaryTransferTime=" + result.getTotalPrimaryTransferTime()
-            + ", TotalPrimaryTransfersCompleted=" + result.getTotalPrimaryTransfersCompleted() + ", TotalTime="
-            + result.getTotalTime() + "]");
+            + ", TotalPrimaryTransferTime=" + result.getTotalPrimaryTransferTime() + ", TotalPrimaryTransfersCompleted="
+            + result.getTotalPrimaryTransfersCompleted() + ", TotalTime=" + result.getTotalTime() + "]");
       } catch (CancellationException e) {
         logger.info("Error rebalancing the cluster", e);
       } catch (InterruptedException e) {
@@ -401,22 +422,44 @@ public class AutoBalancer implements Declarable {
     }
 
     GemFireCacheImpl getCache() {
-      GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
       if (cache == null) {
-        throw new IllegalStateException("Missing cache instance.");
+        synchronized (this) {
+          if (cache == null) {
+            cache = GemFireCacheImpl.getInstance();
+            if (cache == null) {
+              throw new IllegalStateException("Missing cache instance.");
+            }
+          }
+        }
+      }
+      if (cache.isClosed()) {
+        throw new CacheClosedException();
       }
       return cache;
     }
 
     @Override
     public boolean acquireAutoBalanceLock() {
-      DistributedLockService dls = getDLS();
+      if (!isLockAcquired.get()) {
+        synchronized (isLockAcquired) {
+          if (!isLockAcquired.get()) {
+            DistributedLockService dls = getDLS();
 
-      boolean result = dls.lock(AUTO_BALANCER_LOCK, 0, -1);
-      if (logger.isDebugEnabled()) {
-        logger.debug("Grabbed AutoBalancer lock? " + result);
+            boolean result = dls.lock(AUTO_BALANCER_LOCK, 0, -1);
+            if (result) {
+              isLockAcquired.set(true);
+              if (logger.isDebugEnabled()) {
+                logger.debug("Grabbed AutoBalancer lock");
+              }
+            } else {
+              if (logger.isDebugEnabled()) {
+                logger.debug("Another member owns auto-balance lock. Skip this attempt to rebalance the cluster");
+              }
+            }
+          }
+        }
       }
-      return result;
+      return isLockAcquired.get();
     }
 
     @Override
@@ -471,41 +514,10 @@ public class AutoBalancer implements Declarable {
     long getTotalTransferSize();
   }
 
-  /**
-   * Test hook to inject custom triggers
-   */
-  void setScheduler(AuditScheduler trigger) {
-    logger.info("Setting custom AuditScheduler");
-    this.scheduler = trigger;
-  }
-
-  /**
-   * Test hook to inject custom auditors
-   */
-  void setOOBAuditor(OOBAuditor auditor) {
-    logger.info("Setting custom Auditor");
-    this.auditor = auditor;
-  }
-
   OOBAuditor getOOBAuditor() {
     return auditor;
   }
 
-  /**
-   * Test hook to inject a clock
-   */
-  void setTimeProvider(TimeProvider clock) {
-    logger.info("Setting custom TimeProvider");
-    this.clock = clock;
-  }
-
-  /**
-   * Test hook to inject a Cache operation facade
-   */
-  public void setCacheOperationFacade(CacheOperationFacade facade) {
-    this.cacheFacade = facade;
-  }
-
   public CacheOperationFacade getCacheOperationFacade() {
     return this.cacheFacade;
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b4902570/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerAuditorInvocationIntegrationJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerAuditorInvocationIntegrationJUnitTest.java b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerAuditorInvocationIntegrationJUnitTest.java
deleted file mode 100755
index bd6a3ff..0000000
--- a/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerAuditorInvocationIntegrationJUnitTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package com.gemstone.gemfire.cache.util;
-
-import static org.junit.Assert.*;
-
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import org.jmock.Expectations;
-import org.jmock.Mockery;
-import org.jmock.lib.concurrent.Synchroniser;
-import org.jmock.lib.legacy.ClassImposteriser;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.util.AutoBalancer.OOBAuditor;
-import com.gemstone.gemfire.cache.util.AutoBalancer.TimeProvider;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-
-/**
- * IntegrationTest for AuditorInvocation in AutoBalancer. 
- * 
- * <p>AutoBalancer should:<br>
- * 1) be refactored to extract out all inner-classes and inner-interfaces<br>
- * 2) have constructor changed to accept every collaborator as an argument<br>
- * 3) then this test can correctly use mocking without any real threads to wait on
- * 
- * <p>Extracted from AutoBalancerJUnitTest
- */
-@Category(IntegrationTest.class)
-public class AutoBalancerAuditorInvocationIntegrationJUnitTest {
-
-  Mockery mockContext;
-
-  @Before
-  public void setupMock() {
-    mockContext = new Mockery() {
-      {
-        setImposteriser(ClassImposteriser.INSTANCE);
-        setThreadingPolicy(new Synchroniser());
-      }
-    };
-  }
-
-  @After
-  public void validateMock() {
-    mockContext.assertIsSatisfied();
-    mockContext = null;
-  }
-
-  @Test
-  public void testAuditorInvocation() throws InterruptedException {
-    int count = 0;
-
-    final OOBAuditor mockAuditor = mockContext.mock(OOBAuditor.class);
-    final TimeProvider mockClock = mockContext.mock(TimeProvider.class);
-    mockContext.checking(new Expectations() {
-      {
-        oneOf(mockAuditor).init(with(any(Properties.class)));
-        exactly(2).of(mockAuditor).execute();
-        allowing(mockClock).currentTimeMillis();
-        will(returnValue(950L));
-      }
-    });
-
-    Properties props = AutoBalancerJUnitTest.getBasicConfig();
-
-    assertEquals(0, count);
-    AutoBalancer autoR = new AutoBalancer();
-    autoR.setOOBAuditor(mockAuditor);
-    autoR.setTimeProvider(mockClock);
-
-    // the trigger should get invoked after 50 milliseconds
-    autoR.init(props);
-    
-    // TODO: this sleep should NOT be here -- use Awaitility to await a condition instead or use mocking to avoid this altogether
-    TimeUnit.MILLISECONDS.sleep(120); // removal causes failure in validateMock
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b4902570/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerIntegrationJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerIntegrationJUnitTest.java b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerIntegrationJUnitTest.java
index 38b7bf9..cff9d69 100755
--- a/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerIntegrationJUnitTest.java
+++ b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerIntegrationJUnitTest.java
@@ -1,22 +1,18 @@
 package com.gemstone.gemfire.cache.util;
 
-import static com.jayway.awaitility.Awaitility.*;
-import static java.util.concurrent.TimeUnit.*;
-import static org.junit.Assert.*;
-import static org.hamcrest.Matchers.*;
+import static com.jayway.awaitility.Awaitility.await;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
-import java.util.HashSet;
-import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.jmock.Expectations;
-import org.jmock.Mockery;
-import org.jmock.api.Invocation;
-import org.jmock.lib.action.CustomAction;
-import org.jmock.lib.concurrent.Synchroniser;
-import org.jmock.lib.legacy.ClassImposteriser;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -31,38 +27,20 @@ import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 import com.gemstone.gemfire.distributed.internal.locks.DLockService;
 import com.gemstone.gemfire.internal.HostStatSampler;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.PRHARedundancyProvider;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.partitioned.InternalPRInfo;
-import com.gemstone.gemfire.internal.cache.partitioned.LoadProbe;
 import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
 
 /**
- * IntegrationTests for AutoBalancer that include usage of Cache, StatSampler 
- * and DistributedLockService. Some collaborators may be mocked while others
- * are real.
- * 
- * <p>Extracted from AutoBalancerJUnitTest
+ * IntegrationTests for AutoBalancer that include usage of Cache, StatSampler
+ * and DistributedLockService.
  */
 @Category(IntegrationTest.class)
 public class AutoBalancerIntegrationJUnitTest {
-  
+
   private static final int TIMEOUT_SECONDS = 5;
 
   private GemFireCacheImpl cache;
-  private Mockery mockContext;
 
   @Before
-  public void setupMock() {
-    mockContext = new Mockery() {
-      {
-        setImposteriser(ClassImposteriser.INSTANCE);
-        setThreadingPolicy(new Synchroniser());
-      }
-    };
-  }
-  
-  @Before
   public void setUpCacheAndDLS() {
     cache = createBasicCache();
   }
@@ -75,7 +53,7 @@ public class AutoBalancerIntegrationJUnitTest {
 
     if (cache != null && !cache.isClosed()) {
       try {
-        final HostStatSampler statSampler = ((InternalDistributedSystem)cache.getDistributedSystem()).getStatSampler();
+        final HostStatSampler statSampler = ((InternalDistributedSystem) cache.getDistributedSystem()).getStatSampler();
         cache.close();
         // wait for the stat sampler to stand down
         await().atMost(TIMEOUT_SECONDS, SECONDS).until(isAlive(statSampler), equalTo(false));
@@ -84,80 +62,41 @@ public class AutoBalancerIntegrationJUnitTest {
       }
     }
   }
-  
-  @After
-  public void validateMock() {
-    mockContext.assertIsSatisfied();
-    mockContext = null;
-  }
 
   @Test
   public void testAutoRebalaceStatsOnLockSuccess() throws InterruptedException {
-    final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
-    mockContext.checking(new Expectations() {
-      {
-        oneOf(mockCacheFacade).acquireAutoBalanceLock();
-        will(returnValue(true));
-        oneOf(mockCacheFacade).incrementAttemptCounter();
-        will(new CustomAction("increment stat") {
-          public Object invoke(Invocation invocation) throws Throwable {
-            new GeodeCacheFacade().incrementAttemptCounter();
-            return null;
-          }
-        });
-        allowing(mockCacheFacade);
-      }
-    });
-
     assertEquals(0, cache.getResourceManager().getStats().getAutoRebalanceAttempts());
     AutoBalancer balancer = new AutoBalancer();
-    balancer.setCacheOperationFacade(mockCacheFacade);
     balancer.getOOBAuditor().execute();
-    
     assertEquals(1, cache.getResourceManager().getStats().getAutoRebalanceAttempts());
   }
 
   @Test
   public void testAutoRebalaceStatsOnLockFailure() throws InterruptedException {
-    final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
-    mockContext.checking(new Expectations() {
-      {
-        oneOf(mockCacheFacade).acquireAutoBalanceLock();
-        will(returnValue(false));
-      }
-    });
-
+    acquireLockInDifferentThread(1);
     assertEquals(0, cache.getResourceManager().getStats().getAutoRebalanceAttempts());
     AutoBalancer balancer = new AutoBalancer();
-    balancer.setCacheOperationFacade(mockCacheFacade);
     balancer.getOOBAuditor().execute();
-
     assertEquals(0, cache.getResourceManager().getStats().getAutoRebalanceAttempts());
   }
-  
+
   @Test
   public void testAutoBalanceStatUpdate() {
     assertEquals(0, cache.getResourceManager().getStats().getAutoRebalanceAttempts());
     new GeodeCacheFacade().incrementAttemptCounter();
-    
     assertEquals(1, cache.getResourceManager().getStats().getAutoRebalanceAttempts());
   }
-  
+
   @Test
   public void testLockSuccess() throws InterruptedException {
-    final AtomicBoolean acquiredAutoBalanceLock = new AtomicBoolean(true);
-    
-    Thread thread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        CacheOperationFacade cacheFacade = new GeodeCacheFacade();
-        acquiredAutoBalanceLock.set(cacheFacade.acquireAutoBalanceLock());
-      }
-    });
-    thread.start();
-    
-    await().atMost(TIMEOUT_SECONDS, SECONDS).untilTrue(acquiredAutoBalanceLock);
-    
+    acquireLockInDifferentThread(1);
+    DistributedLockService dls = new GeodeCacheFacade().getDLS();
+    assertFalse(dls.lock(AutoBalancer.AUTO_BALANCER_LOCK, 0, -1));
+  }
+
+  @Test
+  public void canReacquireLock() throws InterruptedException {
+    acquireLockInDifferentThread(2);
     DistributedLockService dls = new GeodeCacheFacade().getDLS();
     assertFalse(dls.lock(AutoBalancer.AUTO_BALANCER_LOCK, 0, -1));
   }
@@ -168,7 +107,7 @@ public class AutoBalancerIntegrationJUnitTest {
     assertTrue(dls.lock(AutoBalancer.AUTO_BALANCER_LOCK, 0, -1));
 
     final AtomicBoolean success = new AtomicBoolean(true);
-    
+
     Thread thread = new Thread(new Runnable() {
       @Override
       public void run() {
@@ -178,7 +117,7 @@ public class AutoBalancerIntegrationJUnitTest {
     });
     thread.start();
     thread.join();
-    
+
     assertFalse(success.get());
   }
 
@@ -213,57 +152,6 @@ public class AutoBalancerIntegrationJUnitTest {
     cache.loadCacheXml(new ByteArrayInputStream(configStr.getBytes()));
   }
 
-  @Test
-  public void testFacadeCollectMemberDetails2Regions() {
-    final GemFireCacheImpl mockCache = mockContext.mock(GemFireCacheImpl.class);
-
-    final PartitionedRegion mockR1 = mockContext.mock(PartitionedRegion.class, "r1");
-    final PartitionedRegion mockR2 = mockContext.mock(PartitionedRegion.class, "r2");
-    final HashSet<PartitionedRegion> regions = new HashSet<>();
-    regions.add(mockR1);
-    regions.add(mockR2);
-
-    final PRHARedundancyProvider mockRedundancyProviderR1 = mockContext.mock(PRHARedundancyProvider.class, "prhaR1");
-    final InternalPRInfo mockR1PRInfo = mockContext.mock(InternalPRInfo.class, "prInforR1");
-
-    final PRHARedundancyProvider mockRedundancyProviderR2 = mockContext.mock(PRHARedundancyProvider.class, "prhaR2");
-    final InternalPRInfo mockR2PRInfo = mockContext.mock(InternalPRInfo.class, "prInforR2");
-
-    mockContext.checking(new Expectations() {
-      {
-        oneOf(mockCache).getPartitionedRegions();
-        will(returnValue(regions));
-        exactly(2).of(mockCache).getResourceManager();
-        will(returnValue(cache.getResourceManager()));
-        allowing(mockR1).getFullPath();
-        oneOf(mockR1).getRedundancyProvider();
-        will(returnValue(mockRedundancyProviderR1));
-        allowing(mockR2).getFullPath();
-        oneOf(mockR2).getRedundancyProvider();
-        will(returnValue(mockRedundancyProviderR2));
-
-        oneOf(mockRedundancyProviderR1).buildPartitionedRegionInfo(with(true), with(any(LoadProbe.class)));
-        will(returnValue(mockR1PRInfo));
-
-        oneOf(mockRedundancyProviderR2).buildPartitionedRegionInfo(with(true), with(any(LoadProbe.class)));
-        will(returnValue(mockR2PRInfo));
-      }
-    });
-
-    GeodeCacheFacade facade = new GeodeCacheFacade() {
-      @Override
-      GemFireCacheImpl getCache() {
-        return mockCache;
-      }
-    };
-
-    Map<PartitionedRegion, InternalPRInfo> map = facade.getRegionMemberDetails();
-    assertNotNull(map);
-    assertEquals(2, map.size());
-    assertEquals(map.get(mockR1), mockR1PRInfo);
-    assertEquals(map.get(mockR2), mockR2PRInfo);
-  }
-
   private GemFireCacheImpl createBasicCache() {
     return (GemFireCacheImpl) new CacheFactory().set("mcast-port", "0").create();
   }
@@ -276,4 +164,22 @@ public class AutoBalancerIntegrationJUnitTest {
       }
     };
   }
+
+  private void acquireLockInDifferentThread(final int num) throws InterruptedException {
+    final CountDownLatch latch = new CountDownLatch(num);
+    Thread thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        CacheOperationFacade cacheFacade = new GeodeCacheFacade();
+        for (int i = 0; i < num; i++) {
+          boolean result = cacheFacade.acquireAutoBalanceLock();
+          if (result) {
+            latch.countDown();
+          }
+        }
+      }
+    });
+    thread.start();
+    assertTrue(latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b4902570/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
index 1eca3c2..5aa0b8d 100644
--- a/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
+++ b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
@@ -1,15 +1,23 @@
 package com.gemstone.gemfire.cache.util;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.jmock.Expectations;
 import org.jmock.Mockery;
 import org.jmock.Sequence;
+import org.jmock.api.Invocation;
+import org.jmock.lib.action.CustomAction;
 import org.jmock.lib.concurrent.Synchroniser;
 import org.jmock.lib.legacy.ClassImposteriser;
 import org.junit.After;
@@ -27,10 +35,13 @@ import com.gemstone.gemfire.cache.util.AutoBalancer.CacheOperationFacade;
 import com.gemstone.gemfire.cache.util.AutoBalancer.GeodeCacheFacade;
 import com.gemstone.gemfire.cache.util.AutoBalancer.OOBAuditor;
 import com.gemstone.gemfire.cache.util.AutoBalancer.SizeBasedOOBAuditor;
+import com.gemstone.gemfire.cache.util.AutoBalancer.TimeProvider;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PRHARedundancyProvider;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
 import com.gemstone.gemfire.internal.cache.partitioned.InternalPRInfo;
+import com.gemstone.gemfire.internal.cache.partitioned.LoadProbe;
 import com.gemstone.gemfire.test.junit.categories.UnitTest;
 
 /**
@@ -40,6 +51,11 @@ import com.gemstone.gemfire.test.junit.categories.UnitTest;
 public class AutoBalancerJUnitTest {
   Mockery mockContext;
 
+  CacheOperationFacade mockCacheFacade;
+  OOBAuditor mockAuditor;
+  AuditScheduler mockScheduler;
+  TimeProvider mockClock;
+
   @Before
   public void setupMock() {
     mockContext = new Mockery() {
@@ -48,6 +64,11 @@ public class AutoBalancerJUnitTest {
         setThreadingPolicy(new Synchroniser());
       }
     };
+
+    mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
+    mockAuditor = mockContext.mock(OOBAuditor.class);
+    mockScheduler = mockContext.mock(AuditScheduler.class);
+    mockClock = mockContext.mock(TimeProvider.class);
   }
 
   @After
@@ -58,7 +79,6 @@ public class AutoBalancerJUnitTest {
 
   @Test
   public void testLockStatExecuteInSequence() throws InterruptedException {
-    final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
     final Sequence sequence = mockContext.sequence("sequence");
     mockContext.checking(new Expectations() {
       {
@@ -73,33 +93,12 @@ public class AutoBalancerJUnitTest {
       }
     });
 
-    AutoBalancer balancer = new AutoBalancer();
-    balancer.setCacheOperationFacade(mockCacheFacade);
-    balancer.getOOBAuditor().execute();
-  }
-
-  @Test
-  public void testReusePreAcquiredLock() throws InterruptedException {
-    final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
-    mockContext.checking(new Expectations() {
-      {
-        oneOf(mockCacheFacade).acquireAutoBalanceLock();
-        will(returnValue(true));
-        exactly(2).of(mockCacheFacade).incrementAttemptCounter();
-        exactly(2).of(mockCacheFacade).getTotalTransferSize();
-        will(returnValue(0L));
-      }
-    });
-
-    AutoBalancer balancer = new AutoBalancer();
-    balancer.setCacheOperationFacade(mockCacheFacade);
-    balancer.getOOBAuditor().execute();
+    AutoBalancer balancer = new AutoBalancer(null, null, null, mockCacheFacade);
     balancer.getOOBAuditor().execute();
   }
 
   @Test
   public void testAcquireLockAfterReleasedRemotely() throws InterruptedException {
-    final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
     final Sequence sequence = mockContext.sequence("sequence");
     mockContext.checking(new Expectations() {
       {
@@ -115,15 +114,13 @@ public class AutoBalancerJUnitTest {
       }
     });
 
-    AutoBalancer balancer = new AutoBalancer();
-    balancer.setCacheOperationFacade(mockCacheFacade);
+    AutoBalancer balancer = new AutoBalancer(null, null, null, mockCacheFacade);
     balancer.getOOBAuditor().execute();
     balancer.getOOBAuditor().execute();
   }
 
   @Test
   public void testFailExecuteIfLockedElsewhere() throws InterruptedException {
-    final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
     mockContext.checking(new Expectations() {
       {
         oneOf(mockCacheFacade).acquireAutoBalanceLock();
@@ -132,40 +129,7 @@ public class AutoBalancerJUnitTest {
       }
     });
 
-    Thread thread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        AutoBalancer balancer = new AutoBalancer();
-        balancer.setCacheOperationFacade(mockCacheFacade);
-        balancer.getOOBAuditor().execute();
-      }
-    });
-    thread.start();
-    thread.join();
-  }
-
-  @Test
-  public void testFailExecuteIfBalanced() throws InterruptedException {
-    final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
-    mockContext.checking(new Expectations() {
-      {
-        oneOf(mockCacheFacade).acquireAutoBalanceLock();
-        will(returnValue(true));
-        never(mockCacheFacade).rebalance();
-        oneOf(mockCacheFacade).incrementAttemptCounter();
-      }
-    });
-
-    AutoBalancer balancer = new AutoBalancer();
-    balancer.setCacheOperationFacade(mockCacheFacade);
-
-    SizeBasedOOBAuditor auditor = balancer.new SizeBasedOOBAuditor() {
-      @Override
-      boolean needsRebalancing() {
-        return false;
-      }
-    };
-    balancer.setOOBAuditor(auditor);
+    AutoBalancer balancer = new AutoBalancer(null, null, null, mockCacheFacade);
     balancer.getOOBAuditor().execute();
   }
 
@@ -181,7 +145,6 @@ public class AutoBalancerJUnitTest {
     final long totalSize = 1000L;
 
     final Map<PartitionedRegion, InternalPRInfo> details = new HashMap<>();
-    final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
     mockContext.checking(new Expectations() {
       {
         allowing(mockCacheFacade).getRegionMemberDetails();
@@ -200,8 +163,7 @@ public class AutoBalancerJUnitTest {
       }
     });
 
-    AutoBalancer balancer = new AutoBalancer();
-    balancer.setCacheOperationFacade(mockCacheFacade);
+    AutoBalancer balancer = new AutoBalancer(null, null, null, mockCacheFacade);
     Properties config = getBasicConfig();
     config.put(AutoBalancer.MINIMUM_SIZE, "10");
     balancer.init(config);
@@ -218,7 +180,6 @@ public class AutoBalancerJUnitTest {
   public void testOOBWhenAboveThresholdButBelowMin() {
     final long totalSize = 1000L;
 
-    final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
     mockContext.checking(new Expectations() {
       {
         // first run
@@ -233,8 +194,7 @@ public class AutoBalancerJUnitTest {
       }
     });
 
-    AutoBalancer balancer = new AutoBalancer();
-    balancer.setCacheOperationFacade(mockCacheFacade);
+    AutoBalancer balancer = new AutoBalancer(null, null, null, mockCacheFacade);
     Properties config = getBasicConfig();
     config.put(AutoBalancer.MINIMUM_SIZE, "" + (totalSize * 5));
     balancer.init(config);
@@ -252,7 +212,6 @@ public class AutoBalancerJUnitTest {
     final long totalSize = 1000L;
 
     final Map<PartitionedRegion, InternalPRInfo> details = new HashMap<>();
-    final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
     mockContext.checking(new Expectations() {
       {
         allowing(mockCacheFacade).getRegionMemberDetails();
@@ -274,8 +233,7 @@ public class AutoBalancerJUnitTest {
       }
     });
 
-    AutoBalancer balancer = new AutoBalancer();
-    balancer.setCacheOperationFacade(mockCacheFacade);
+    AutoBalancer balancer = new AutoBalancer(null, null, null, mockCacheFacade);
     Properties config = getBasicConfig();
     config.put(AutoBalancer.MINIMUM_SIZE, "10");
     balancer.init(config);
@@ -355,8 +313,6 @@ public class AutoBalancerJUnitTest {
     props.put(AutoBalancer.SCHEDULE, someSchedule);
     props.put(AutoBalancer.SIZE_THRESHOLD_PERCENT, 17);
 
-    final AuditScheduler mockScheduler = mockContext.mock(AuditScheduler.class);
-    final OOBAuditor mockAuditor = mockContext.mock(OOBAuditor.class);
     mockContext.checking(new Expectations() {
       {
         oneOf(mockScheduler).init(someSchedule);
@@ -364,10 +320,7 @@ public class AutoBalancerJUnitTest {
       }
     });
 
-    AutoBalancer autoR = new AutoBalancer();
-    autoR.setScheduler(mockScheduler);
-    autoR.setOOBAuditor(mockAuditor);
-
+    AutoBalancer autoR = new AutoBalancer(mockScheduler, mockAuditor, null, null);
     autoR.init(props);
   }
 
@@ -404,6 +357,8 @@ public class AutoBalancerJUnitTest {
 
     mockContext.checking(new Expectations() {
       {
+        oneOf(mockCache).isClosed();
+        will(returnValue(false));
         oneOf(mockCache).getResourceManager();
         will(returnValue(mockRM));
         oneOf(mockRM).createRebalanceFactory();
@@ -424,12 +379,7 @@ public class AutoBalancerJUnitTest {
       }
     });
 
-    GeodeCacheFacade facade = new GeodeCacheFacade() {
-      @Override
-      GemFireCacheImpl getCache() {
-        return mockCache;
-      }
-    };
+    GeodeCacheFacade facade = new GeodeCacheFacade(mockCache);
 
     return facade;
   }
@@ -446,22 +396,71 @@ public class AutoBalancerJUnitTest {
     final GemFireCacheImpl mockCache = mockContext.mock(GemFireCacheImpl.class);
     mockContext.checking(new Expectations() {
       {
+        oneOf(mockCache).isClosed();
+        will(returnValue(false));
         oneOf(mockCache).getPartitionedRegions();
         will(returnValue(new HashSet<PartitionedRegion>()));
       }
     });
 
-    GeodeCacheFacade facade = new GeodeCacheFacade() {
-      @Override
-      GemFireCacheImpl getCache() {
-        return mockCache;
-      }
-    };
+    GeodeCacheFacade facade = new GeodeCacheFacade(mockCache);
 
     assertEquals(0, facade.getRegionMemberDetails().size());
   }
 
   @Test
+  public void testFacadeCollectMemberDetails2Regions() {
+    final GemFireCacheImpl mockCache = mockContext.mock(GemFireCacheImpl.class);
+    final InternalResourceManager mockRM = mockContext.mock(InternalResourceManager.class);
+    final LoadProbe mockProbe = mockContext.mock(LoadProbe.class);
+
+    final PartitionedRegion mockR1 = mockContext.mock(PartitionedRegion.class, "r1");
+    final PartitionedRegion mockR2 = mockContext.mock(PartitionedRegion.class, "r2");
+    final HashSet<PartitionedRegion> regions = new HashSet<>();
+    regions.add(mockR1);
+    regions.add(mockR2);
+
+    final PRHARedundancyProvider mockRedundancyProviderR1 = mockContext.mock(PRHARedundancyProvider.class, "prhaR1");
+    final InternalPRInfo mockR1PRInfo = mockContext.mock(InternalPRInfo.class, "prInforR1");
+
+    final PRHARedundancyProvider mockRedundancyProviderR2 = mockContext.mock(PRHARedundancyProvider.class, "prhaR2");
+    final InternalPRInfo mockR2PRInfo = mockContext.mock(InternalPRInfo.class, "prInforR2");
+
+    mockContext.checking(new Expectations() {
+      {
+        oneOf(mockCache).isClosed();
+        will(returnValue(false));
+        oneOf(mockCache).getPartitionedRegions();
+        will(returnValue(regions));
+        exactly(2).of(mockCache).getResourceManager();
+        will(returnValue(mockRM));
+        exactly(2).of(mockRM).getLoadProbe();
+        will(returnValue(mockProbe));
+        allowing(mockR1).getFullPath();
+        oneOf(mockR1).getRedundancyProvider();
+        will(returnValue(mockRedundancyProviderR1));
+        allowing(mockR2).getFullPath();
+        oneOf(mockR2).getRedundancyProvider();
+        will(returnValue(mockRedundancyProviderR2));
+
+        oneOf(mockRedundancyProviderR1).buildPartitionedRegionInfo(with(true), with(any(LoadProbe.class)));
+        will(returnValue(mockR1PRInfo));
+
+        oneOf(mockRedundancyProviderR2).buildPartitionedRegionInfo(with(true), with(any(LoadProbe.class)));
+        will(returnValue(mockR2PRInfo));
+      }
+    });
+
+    GeodeCacheFacade facade = new GeodeCacheFacade(mockCache);
+
+    Map<PartitionedRegion, InternalPRInfo> map = facade.getRegionMemberDetails();
+    assertNotNull(map);
+    assertEquals(2, map.size());
+    assertEquals(map.get(mockR1), mockR1PRInfo);
+    assertEquals(map.get(mockR2), mockR2PRInfo);
+  }
+
+  @Test
   public void testFacadeTotalBytes2Regions() {
     final PartitionedRegion mockR1 = mockContext.mock(PartitionedRegion.class, "r1");
     final PartitionedRegion mockR2 = mockContext.mock(PartitionedRegion.class, "r2");
@@ -514,6 +513,33 @@ public class AutoBalancerJUnitTest {
     assertEquals(123 + 74 + 3475, facade.getTotalDataSize(details));
   }
 
+  @Test
+  public void testAuditorInvocation() throws InterruptedException {
+    final CountDownLatch latch = new CountDownLatch(3);
+
+    mockContext.checking(new Expectations() {
+      {
+        oneOf(mockAuditor).init(with(any(Properties.class)));
+        exactly(2).of(mockAuditor).execute();
+        allowing(mockClock).currentTimeMillis();
+        will(new CustomAction("returnTime") {
+          @Override
+          public Object invoke(Invocation invocation) throws Throwable {
+            latch.countDown();
+            return 990L;
+          }
+        });
+      }
+    });
+
+    Properties props = AutoBalancerJUnitTest.getBasicConfig();
+
+    assertEquals(3, latch.getCount());
+    AutoBalancer autoR = new AutoBalancer(null, mockAuditor, mockClock, null);
+    autoR.init(props);
+    assertTrue(latch.await(1, TimeUnit.SECONDS));
+  }
+
   static Properties getBasicConfig() {
     Properties props = new Properties();
     // every second schedule