You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2019/04/18 19:59:46 UTC

[geode] branch develop updated: GEODE-6630: move allBucketsRecoveredFromDisk count down latch (#3477)

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 49757e7  GEODE-6630: move allBucketsRecoveredFromDisk count down latch (#3477)
49757e7 is described below

commit 49757e7e3706854c0775180d47b3d2825579f3f3
Author: pivotal-eshu <es...@pivotal.io>
AuthorDate: Thu Apr 18 12:59:30 2019 -0700

    GEODE-6630: move allBucketsRecoveredFromDisk count down latch (#3477)
    
    * Rename RedundancyLogger to PersistentBucketRecoverer
    * Move allBucketsRecoveredFromDisk count down latch from PRHARedundancyProvider to PersistentBucketRecoverer.
    * Provide utility methods for using the count down latch.
---
 .../internal/cache/PRHARedundancyProvider.java     |  90 ++++++-----------
 ...yLogger.java => PersistentBucketRecoverer.java} | 111 +++++++++++++++++----
 .../internal/cache/PRHARedundancyProviderTest.java |  22 ++--
 .../partitioned/PersistentBucketRecovererTest.java |  69 +++++++++++++
 4 files changed, 205 insertions(+), 87 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
index 2942154..26c3406 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -75,8 +74,8 @@ import org.apache.geode.internal.cache.partitioned.PRLoad;
 import org.apache.geode.internal.cache.partitioned.PartitionMemberInfoImpl;
 import org.apache.geode.internal.cache.partitioned.PartitionRegionInfoImpl;
 import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOp;
+import org.apache.geode.internal.cache.partitioned.PersistentBucketRecoverer;
 import org.apache.geode.internal.cache.partitioned.RecoveryRunnable;
-import org.apache.geode.internal.cache.partitioned.RedundancyLogger;
 import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
 import org.apache.geode.internal.cache.partitioned.RegionAdvisor.PartitionProfile;
 import org.apache.geode.internal.cache.partitioned.rebalance.CompositeDirector;
@@ -144,12 +143,10 @@ public class PRHARedundancyProvider {
   private final Object shutdownLock = new Object();
   private boolean shutdown = false;
 
-  volatile CountDownLatch allBucketsRecoveredFromDisk;
-
   /**
    * Used to consolidate logging for bucket regions waiting on other members to come online.
    */
-  private RedundancyLogger redundancyLogger = null;
+  private volatile PersistentBucketRecoverer persistentBucketRecoverer = null;
 
   /**
    * Constructor for PRHARedundancyProvider.
@@ -1685,6 +1682,10 @@ public class PRHARedundancyProvider {
     final ProxyBucketRegion[] proxyBucketArray =
         persistentLeader.getRegionAdvisor().getProxyBucketArray();
 
+    if (proxyBucketArray.length == 0) {
+      throw new IllegalStateException("Unexpected empty proxy bucket array");
+    }
+
     for (ProxyBucketRegion proxyBucket : proxyBucketArray) {
       proxyBucket.initializePersistenceAdvisor();
     }
@@ -1707,14 +1708,9 @@ public class PRHARedundancyProvider {
     ArrayList<ProxyBucketRegion> bucketsHostedLocally =
         new ArrayList<ProxyBucketRegion>(proxyBucketArray.length);
 
+    createPersistentBucketRecoverer(proxyBucketArray.length);
 
     /*
-     * Start the redundancy logger before recovering any proxy buckets.
-     */
-    startRedundancyLogger(proxyBucketArray.length);
-
-    allBucketsRecoveredFromDisk = new CountDownLatch(proxyBucketArray.length);
-    /*
      * Spawn a separate thread for bucket that we previously hosted to recover that bucket.
      *
      * That thread will get to the point at which it has determined that at least one member
@@ -1734,7 +1730,6 @@ public class PRHARedundancyProvider {
       if (proxyBucket.getPersistenceAdvisor().wasHosting()) {
         final RecoveryRunnable recoveryRunnable = new RecoveryRunnable(this) {
 
-
           @Override
           public void run() {
             // Fix for 44551 - make sure that we always count down
@@ -1742,7 +1737,9 @@ public class PRHARedundancyProvider {
             try {
               super.run();
             } finally {
-              allBucketsRecoveredFromDisk.countDown();
+              if (getPersistentBucketRecoverer() != null) {
+                getPersistentBucketRecoverer().countDown();
+              }
             }
           }
 
@@ -1773,8 +1770,8 @@ public class PRHARedundancyProvider {
         proxyBucket.recoverFromDiskRecursively();
       }
     } finally {
-      for (final ProxyBucketRegion proxyBucket : bucketsNotHostedLocally) {
-        allBucketsRecoveredFromDisk.countDown();
+      if (getPersistentBucketRecoverer() != null) {
+        getPersistentBucketRecoverer().countDown(bucketsNotHostedLocally.size());
       }
     }
 
@@ -1784,13 +1781,13 @@ public class PRHARedundancyProvider {
     // }
   }
 
-  void startRedundancyLogger(int proxyBuckets) {
-    if (proxyBuckets > 0) {
-      redundancyLogger = new RedundancyLogger(this);
-      Thread loggingThread = new LoggingThread(
-          "RedundancyLogger for region " + this.prRegion.getName(), false, this.redundancyLogger);
-      loggingThread.start();
-    }
+  private void createPersistentBucketRecoverer(int proxyBuckets) {
+    persistentBucketRecoverer = new PersistentBucketRecoverer(this, proxyBuckets);
+    persistentBucketRecoverer.startLoggingThread();
+  }
+
+  PersistentBucketRecoverer getPersistentBucketRecoverer() {
+    return persistentBucketRecoverer;
   }
 
   /**
@@ -1981,24 +1978,9 @@ public class PRHARedundancyProvider {
    * whichever happens first.
    */
   protected void waitForPersistentBucketRecoveryOrClose() {
-    CountDownLatch recoveryLatch = allBucketsRecoveredFromDisk;
-    if (recoveryLatch != null) {
-      boolean interrupted = false;
-      while (true) {
-        try {
-          this.prRegion.getCancelCriterion().checkCancelInProgress(null);
-          boolean done = recoveryLatch.await(
-              PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION, TimeUnit.MILLISECONDS);
-          if (done) {
-            break;
-          }
-        } catch (InterruptedException e) {
-          interrupted = true;
-        }
-      }
-      if (interrupted) {
-        Thread.currentThread().interrupt();
-      }
+    if (getPersistentBucketRecoverer() != null) {
+      getPersistentBucketRecoverer().await(
+          PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION, TimeUnit.MILLISECONDS);
     }
 
     List<PartitionedRegion> colocatedRegions =
@@ -2013,20 +1995,8 @@ public class PRHARedundancyProvider {
    * currently being closed.
    */
   protected void waitForPersistentBucketRecovery() {
-    CountDownLatch recoveryLatch = allBucketsRecoveredFromDisk;
-    if (recoveryLatch != null) {
-      boolean interrupted = false;
-      while (true) {
-        try {
-          recoveryLatch.await();
-          break;
-        } catch (InterruptedException e) {
-          interrupted = true;
-        }
-      }
-      if (interrupted) {
-        Thread.currentThread().interrupt();
-      }
+    if (getPersistentBucketRecoverer() != null) {
+      getPersistentBucketRecoverer().await();
     }
   }
 
@@ -2035,7 +2005,8 @@ public class PRHARedundancyProvider {
       return false;
     }
 
-    if (allBucketsRecoveredFromDisk != null && allBucketsRecoveredFromDisk.getCount() > 0) {
+    if (getPersistentBucketRecoverer() != null
+        && !getPersistentBucketRecoverer().hasRecoveryCompleted()) {
       return false;
     }
 
@@ -2044,12 +2015,11 @@ public class PRHARedundancyProvider {
 
     for (PartitionedRegion region : colocatedRegions.values()) {
       PRHARedundancyProvider redundancyProvider = region.getRedundancyProvider();
-      if (redundancyProvider.allBucketsRecoveredFromDisk != null
-          && redundancyProvider.allBucketsRecoveredFromDisk.getCount() > 0) {
+      if (redundancyProvider.getPersistentBucketRecoverer() != null &&
+          !redundancyProvider.getPersistentBucketRecoverer().hasRecoveryCompleted()) {
         return false;
       }
     }
-
     return true;
   }
 
@@ -2323,10 +2293,6 @@ public class PRHARedundancyProvider {
     }
   }
 
-  public CountDownLatch getAllBucketsRecoveredFromDiskLatch() {
-    return allBucketsRecoveredFromDisk;
-  }
-
   private ThreadsMonitoring getThreadMonitorObj() {
     DistributionManager distributionManager = this.prRegion.getDistributionManager();
     if (distributionManager != null) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RedundancyLogger.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecoverer.java
similarity index 81%
rename from geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RedundancyLogger.java
rename to geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecoverer.java
index f8b98ac..5654c61 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RedundancyLogger.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecoverer.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.logging.log4j.Logger;
 
@@ -38,17 +39,17 @@ import org.apache.geode.internal.cache.ProxyBucketRegion;
 import org.apache.geode.internal.cache.persistence.PersistentMemberID;
 import org.apache.geode.internal.cache.persistence.PersistentStateListener;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.LoggingThread;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.process.StartupStatus;
 import org.apache.geode.internal.util.TransformUtils;
 
 /**
  * Consolidates logging during the recovery of ProxyRegionBuckets that are not hosted by this
- * member. This logger is meant to run in its own thread and utilizes the PRHARedundancyProvider's
- * count down latch in order to determine when it is finished.
- *
+ * member. The logger is meant to run in its own thread.
+ * It uses a count down latch to determine whether the recovery is finished.
  */
-public class RedundancyLogger extends RecoveryRunnable implements PersistentStateListener {
+public class PersistentBucketRecoverer extends RecoveryRunnable implements PersistentStateListener {
 
   private static final Logger logger = LogService.getLogger();
 
@@ -71,14 +72,15 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
 
 
   /**
-   * Creates a new RedundancyLogger.
+   * Creates a new PersistentBucketRecoverer.
    *
    */
-  public RedundancyLogger(PRHARedundancyProvider prhaRedundancyProvider) {
+  public PersistentBucketRecoverer(PRHARedundancyProvider prhaRedundancyProvider,
+      int proxyBuckets) {
     super(prhaRedundancyProvider);
     PartitionedRegion baseRegion = ColocationHelper.getLeaderRegion(redundancyProvider.prRegion);
     List<PartitionedRegion> colocatedRegions =
-        ColocationHelper.getColocatedChildRegions(baseRegion);
+        getColocatedChildRegions(baseRegion);
     List<RegionStatus> allRegions = new ArrayList<RegionStatus>(colocatedRegions.size() + 1);
     if (baseRegion.getDataPolicy().withPersistence()) {
       allRegions.add(new RegionStatus(baseRegion));
@@ -89,12 +91,22 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
       }
     }
 
-    this.regions = Collections.unmodifiableList(allRegions);
+    regions = Collections.unmodifiableList(allRegions);
+    allBucketsRecoveredFromDisk = new CountDownLatch(proxyBuckets);
+    membershipChanged = true;
+    addListeners();
 
+  }
 
-    this.allBucketsRecoveredFromDisk = redundancyProvider.getAllBucketsRecoveredFromDiskLatch();
-    this.membershipChanged = true;
-    addListeners();
+  List<PartitionedRegion> getColocatedChildRegions(PartitionedRegion baseRegion) {
+    return ColocationHelper.getColocatedChildRegions(baseRegion);
+  }
+
+  public void startLoggingThread() {
+    Thread loggingThread = new LoggingThread(
+        "PersistentBucketRecoverer for region " + redundancyProvider.prRegion.getName(), false,
+        this);
+    loggingThread.start();
   }
 
   /**
@@ -124,7 +136,8 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
 
 
   /**
-   * Add this RedundancyLogger as a persistence listener to all the region's bucket advisors.
+   * Add this PersistentBucketRecoverer as a persistence listener to all the region's bucket
+   * advisors.
    */
   private void addListeners() {
     for (RegionStatus region : regions) {
@@ -133,7 +146,8 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
   }
 
   /**
-   * Removes this RedundancyLogger as a persistence listener from all the region's bucket advisors.
+   * Removes this PersistentBucketRecoverer as a persistence listener from all the region's bucket
+   * advisors.
    */
   private void removeListeners() {
     for (RegionStatus region : regions) {
@@ -151,7 +165,7 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
   public void run2() {
     try {
       boolean warningLogged = false;
-      while (this.allBucketsRecoveredFromDisk.getCount() > 0) {
+      while (getLatchCount() > 0) {
         int sleepMillis = SLEEP_PERIOD;
         // reduce the first log time from 15secs so that higher layers can
         // report sooner to user
@@ -196,7 +210,7 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
    */
   private class RegionStatus {
     /**
-     * The persistent identifier of the member running this RedundancyLogger.
+     * The persistent identifier of the member running this PersistentBucketRecoverer.
      */
     private final PersistentMemberID thisMember;
 
@@ -223,13 +237,13 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
 
     public void removeListeners() {
       for (ProxyBucketRegion proxyBucket : this.bucketRegions) {
-        proxyBucket.getPersistenceAdvisor().removeListener(RedundancyLogger.this);
+        proxyBucket.getPersistenceAdvisor().removeListener(PersistentBucketRecoverer.this);
       }
     }
 
     public void addListeners() {
       for (ProxyBucketRegion proxyBucket : this.bucketRegions) {
-        proxyBucket.getPersistenceAdvisor().addListener(RedundancyLogger.this);
+        proxyBucket.getPersistenceAdvisor().addListener(PersistentBucketRecoverer.this);
       }
     }
 
@@ -336,8 +350,7 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
       Map<PersistentMemberID, Set<Integer>> offlineMembers = getMembersToWaitFor(true);
       Map<PersistentMemberID, Set<Integer>> allMembersToWaitFor = getMembersToWaitFor(false);
 
-      boolean thereAreBucketsToBeRecovered =
-          (RedundancyLogger.this.allBucketsRecoveredFromDisk.getCount() > 0);
+      boolean thereAreBucketsToBeRecovered = (getLatchCount() > 0);
 
       /*
        * Log any offline members the region is waiting for.
@@ -401,4 +414,64 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
       return allWaitingBuckets;
     }
   }
+
+  public void await(long timeout, TimeUnit unit) {
+    boolean interrupted = false;
+    while (true) {
+      try {
+        redundancyProvider.prRegion.getCancelCriterion().checkCancelInProgress(null);
+        boolean done = allBucketsRecoveredFromDisk.await(timeout, unit);
+        if (done) {
+          break;
+        }
+      } catch (InterruptedException e) {
+        interrupted = true;
+      }
+    }
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public void await() {
+    boolean interrupted = false;
+    while (true) {
+      try {
+        getAllBucketsRecoveredFromDiskLatch().await();
+        break;
+      } catch (InterruptedException e) {
+        interrupted = true;
+      }
+    }
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public void countDown() {
+    allBucketsRecoveredFromDisk.countDown();
+  }
+
+  public void countDown(int size) {
+    while (size > 0) {
+      allBucketsRecoveredFromDisk.countDown();
+      --size;
+    }
+  }
+
+  public boolean hasRecoveryCompleted() {
+    if (getLatchCount() > 0) {
+      return false;
+    }
+    return true;
+  }
+
+  long getLatchCount() {
+    return allBucketsRecoveredFromDisk.getCount();
+  }
+
+  CountDownLatch getAllBucketsRecoveredFromDiskLatch() {
+    return allBucketsRecoveredFromDisk;
+  }
+
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
index e650cbc..c5fcff5 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
@@ -15,15 +15,17 @@
 package org.apache.geode.internal.cache;
 
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
-
-import java.util.concurrent.CountDownLatch;
+import static org.mockito.Mockito.when;
 
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.geode.internal.cache.partitioned.PersistentBucketRecoverer;
+
 
 public class PRHARedundancyProviderTest {
   private PRHARedundancyProvider provider;
@@ -31,21 +33,29 @@ public class PRHARedundancyProviderTest {
   @Before
   public void setup() {
     PartitionedRegion partitionedRegion = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
+    InternalCache cache = mock(InternalCache.class);
+    DistributedRegion root = mock(DistributedRegion.class);
+    when(partitionedRegion.getCache()).thenReturn(cache);
+    when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true)).thenReturn(root);
     provider = spy(new PRHARedundancyProvider(partitionedRegion));
   }
 
   @Test
-  public void waitForPersistentBucketRecoveryProceedsWhenAllBucketsRecoveredFromDiskLatchIsNull() {
+  public void waitForPersistentBucketRecoveryProceedsWhenPersistentBucketRecovererLatchIsNotSet() {
+    PersistentBucketRecoverer recoverer = mock(PersistentBucketRecoverer.class);
+    doReturn(recoverer).when(provider).getPersistentBucketRecoverer();
+
     provider.waitForPersistentBucketRecovery();
   }
 
   @Test
   public void waitForPersistentBucketRecoveryProceedsAfterLatchCountDown() throws Exception {
-    provider.allBucketsRecoveredFromDisk = spy(new CountDownLatch(1));
-    provider.allBucketsRecoveredFromDisk.countDown();
+    PersistentBucketRecoverer recoverer = spy(new PersistentBucketRecoverer(provider, 1));
+    doReturn(recoverer).when(provider).getPersistentBucketRecoverer();
+    provider.getPersistentBucketRecoverer().countDown();
 
     provider.waitForPersistentBucketRecovery();
 
-    verify(provider.allBucketsRecoveredFromDisk).await();
+    verify(recoverer).await();
   }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java
new file mode 100644
index 0000000..12168cd
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PRHARedundancyProvider;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+
+public class PersistentBucketRecovererTest {
+  private PartitionedRegion partitionedRegion;
+  private InternalCache cache;
+  private DistributedRegion root;
+  private PRHARedundancyProvider provider;
+
+  @Before
+  public void setUp() {
+    partitionedRegion = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
+    cache = mock(InternalCache.class);
+    root = mock(DistributedRegion.class);
+    when(partitionedRegion.getCache()).thenReturn(cache);
+    when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true)).thenReturn(root);
+    provider = new PRHARedundancyProvider(partitionedRegion);
+  }
+
+  @Test
+  public void allBucketsRecoveredFromDiskCountDownLatchIsSet() {
+    int numberOfProxyBuckets = 5;
+
+    PersistentBucketRecoverer recoverer =
+        new PersistentBucketRecoverer(provider, numberOfProxyBuckets);
+
+    assertThat(recoverer.getAllBucketsRecoveredFromDiskLatch()).isNotNull();
+    assertThat(recoverer.getLatchCount()).isEqualTo(numberOfProxyBuckets);
+  }
+
+  @Test
+  public void latchCanBeCountedDown() {
+    int numberOfProxyBuckets = 5;
+    PersistentBucketRecoverer recoverer =
+        new PersistentBucketRecoverer(provider, numberOfProxyBuckets);
+
+    assertThat(recoverer.getLatchCount()).isEqualTo(numberOfProxyBuckets);
+    recoverer.countDown(numberOfProxyBuckets);
+
+    recoverer.await();
+  }
+}