You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2019/04/17 22:29:18 UTC

[geode] branch feature/GEODE-6630 updated: fix a review comment.

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-6630
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-6630 by this push:
     new 7edf667  fix a review comment.
7edf667 is described below

commit 7edf667dde099c4de77622fdafe28d28e366b974
Author: eshu <es...@pivotal.io>
AuthorDate: Wed Apr 17 15:28:09 2019 -0700

    fix a review comment.
---
 .../internal/cache/PRHARedundancyProvider.java     | 59 ++++++++++------------
 ...yLogger.java => PersistentBucketRecoverer.java} | 39 ++++++++------
 .../internal/cache/PRHARedundancyProviderTest.java | 45 +++++------------
 .../partitioned/PersistentBucketRecovererTest.java | 48 ++++++++++++++++++
 4 files changed, 112 insertions(+), 79 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
index f519914..7f951fb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
@@ -75,8 +75,8 @@ import org.apache.geode.internal.cache.partitioned.PRLoad;
 import org.apache.geode.internal.cache.partitioned.PartitionMemberInfoImpl;
 import org.apache.geode.internal.cache.partitioned.PartitionRegionInfoImpl;
 import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOp;
+import org.apache.geode.internal.cache.partitioned.PersistentBucketRecoverer;
 import org.apache.geode.internal.cache.partitioned.RecoveryRunnable;
-import org.apache.geode.internal.cache.partitioned.RedundancyLogger;
 import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
 import org.apache.geode.internal.cache.partitioned.RegionAdvisor.PartitionProfile;
 import org.apache.geode.internal.cache.partitioned.rebalance.CompositeDirector;
@@ -144,12 +144,10 @@ public class PRHARedundancyProvider {
   private final Object shutdownLock = new Object();
   private boolean shutdown = false;
 
-  volatile CountDownLatch allBucketsRecoveredFromDisk;
-
   /**
    * Used to consolidate logging for bucket regions waiting on other members to come online.
    */
-  private RedundancyLogger redundancyLogger = null;
+  private volatile PersistentBucketRecoverer persistentBucketRecoverer = null;
 
   /**
    * Constructor for PRHARedundancyProvider.
@@ -1715,7 +1713,11 @@ public class PRHARedundancyProvider {
     /*
      * Start the redundancy logger before recovering any proxy buckets.
      */
-    createRedundancyLoggerAndStartLoggingThread(proxyBucketArray.length);
+    persistentBucketRecoverer = createPersistentBucketRecoverer(proxyBucketArray.length);
+    Thread loggingThread = new LoggingThread(
+        "PersistentBucketRecoverer for region " + prRegion.getName(), false,
+        persistentBucketRecoverer);
+    loggingThread.start();
 
     /*
      * Spawn a separate thread for bucket that we previously hosted to recover that bucket.
@@ -1745,7 +1747,10 @@ public class PRHARedundancyProvider {
             try {
               super.run();
             } finally {
-              allBucketsRecoveredFromDisk.countDown();
+              CountDownLatch recoveryLatch = getRecoveryLatch();
+              if (recoveryLatch != null) {
+                recoveryLatch.countDown();
+              }
             }
           }
 
@@ -1777,7 +1782,10 @@ public class PRHARedundancyProvider {
       }
     } finally {
       for (final ProxyBucketRegion proxyBucket : bucketsNotHostedLocally) {
-        allBucketsRecoveredFromDisk.countDown();
+        CountDownLatch recoveryLatch = getRecoveryLatch();
+        if (recoveryLatch != null) {
+          recoveryLatch.countDown();
+        }
       }
     }
 
@@ -1787,24 +1795,17 @@ public class PRHARedundancyProvider {
     // }
   }
 
-  void createRedundancyLoggerAndStartLoggingThread(int proxyBuckets) {
-    redundancyLogger = createRedundancyLogger();
-    allBucketsRecoveredFromDisk = createAllBucketsRecoveredFromDisk(proxyBuckets);
-    Thread loggingThread = createRedundancyLoggingThread();
-    loggingThread.start();
-  }
-
-  LoggingThread createRedundancyLoggingThread() {
-    return new LoggingThread(
-        "RedundancyLogger for region " + prRegion.getName(), false, redundancyLogger);
+  CountDownLatch getRecoveryLatch() {
+    return getPersistentBucketRecoverer() == null ? null
+        : getPersistentBucketRecoverer().getAllBucketsRecoveredFromDiskLatch();
   }
 
-  CountDownLatch createAllBucketsRecoveredFromDisk(int proxyBuckets) {
-    return new CountDownLatch(proxyBuckets);
+  private PersistentBucketRecoverer createPersistentBucketRecoverer(int proxyBuckets) {
+    return new PersistentBucketRecoverer(this, proxyBuckets);
   }
 
-  RedundancyLogger createRedundancyLogger() {
-    return new RedundancyLogger(this);
+  PersistentBucketRecoverer getPersistentBucketRecoverer() {
+    return persistentBucketRecoverer;
   }
 
   /**
@@ -1995,7 +1996,7 @@ public class PRHARedundancyProvider {
    * whichever happens first.
    */
   protected void waitForPersistentBucketRecoveryOrClose() {
-    CountDownLatch recoveryLatch = allBucketsRecoveredFromDisk;
+    CountDownLatch recoveryLatch = getRecoveryLatch();
     if (recoveryLatch != null) {
       boolean interrupted = false;
       while (true) {
@@ -2027,7 +2028,7 @@ public class PRHARedundancyProvider {
    * currently being closed.
    */
   protected void waitForPersistentBucketRecovery() {
-    CountDownLatch recoveryLatch = allBucketsRecoveredFromDisk;
+    CountDownLatch recoveryLatch = getRecoveryLatch();
     if (recoveryLatch != null) {
       boolean interrupted = false;
       while (true) {
@@ -2048,8 +2049,8 @@ public class PRHARedundancyProvider {
     if (!ColocationHelper.checkMembersColocation(this.prRegion, this.prRegion.getMyId())) {
       return false;
     }
-
-    if (allBucketsRecoveredFromDisk != null && allBucketsRecoveredFromDisk.getCount() > 0) {
+    CountDownLatch recoveryLatch = getRecoveryLatch();
+    if (recoveryLatch != null && recoveryLatch.getCount() > 0) {
       return false;
     }
 
@@ -2058,8 +2059,8 @@ public class PRHARedundancyProvider {
 
     for (PartitionedRegion region : colocatedRegions.values()) {
       PRHARedundancyProvider redundancyProvider = region.getRedundancyProvider();
-      if (redundancyProvider.allBucketsRecoveredFromDisk != null
-          && redundancyProvider.allBucketsRecoveredFromDisk.getCount() > 0) {
+      recoveryLatch = redundancyProvider.getRecoveryLatch();
+      if (recoveryLatch != null && recoveryLatch.getCount() > 0) {
         return false;
       }
     }
@@ -2337,10 +2338,6 @@ public class PRHARedundancyProvider {
     }
   }
 
-  public CountDownLatch getAllBucketsRecoveredFromDiskLatch() {
-    return allBucketsRecoveredFromDisk;
-  }
-
   private ThreadsMonitoring getThreadMonitorObj() {
     DistributionManager distributionManager = this.prRegion.getDistributionManager();
     if (distributionManager != null) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RedundancyLogger.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecoverer.java
similarity index 91%
rename from geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RedundancyLogger.java
rename to geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecoverer.java
index f8b98ac..54e2130 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RedundancyLogger.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecoverer.java
@@ -48,7 +48,7 @@ import org.apache.geode.internal.util.TransformUtils;
  * count down latch in order to determine when it is finished.
  *
  */
-public class RedundancyLogger extends RecoveryRunnable implements PersistentStateListener {
+public class PersistentBucketRecoverer extends RecoveryRunnable implements PersistentStateListener {
 
   private static final Logger logger = LogService.getLogger();
 
@@ -71,14 +71,15 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
 
 
   /**
-   * Creates a new RedundancyLogger.
+   * Creates a new PersistentBucketRecoverer.
    *
    */
-  public RedundancyLogger(PRHARedundancyProvider prhaRedundancyProvider) {
+  public PersistentBucketRecoverer(PRHARedundancyProvider prhaRedundancyProvider,
+      int proxyBuckets) {
     super(prhaRedundancyProvider);
     PartitionedRegion baseRegion = ColocationHelper.getLeaderRegion(redundancyProvider.prRegion);
     List<PartitionedRegion> colocatedRegions =
-        ColocationHelper.getColocatedChildRegions(baseRegion);
+        getColocatedChildRegions(baseRegion);
     List<RegionStatus> allRegions = new ArrayList<RegionStatus>(colocatedRegions.size() + 1);
     if (baseRegion.getDataPolicy().withPersistence()) {
       allRegions.add(new RegionStatus(baseRegion));
@@ -89,14 +90,16 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
       }
     }
 
-    this.regions = Collections.unmodifiableList(allRegions);
-
-
-    this.allBucketsRecoveredFromDisk = redundancyProvider.getAllBucketsRecoveredFromDiskLatch();
-    this.membershipChanged = true;
+    regions = Collections.unmodifiableList(allRegions);
+    allBucketsRecoveredFromDisk = new CountDownLatch(proxyBuckets);
+    membershipChanged = true;
     addListeners();
   }
 
+  List<PartitionedRegion> getColocatedChildRegions(PartitionedRegion baseRegion) {
+    return ColocationHelper.getColocatedChildRegions(baseRegion);
+  }
+
   /**
    * Called when a member comes online for a bucket.
    */
@@ -124,7 +127,8 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
 
 
   /**
-   * Add this RedundancyLogger as a persistence listener to all the region's bucket advisors.
+   * Add this PersistentBucketRecoverer as a persistence listener to all the region's bucket
+   * advisors.
    */
   private void addListeners() {
     for (RegionStatus region : regions) {
@@ -133,7 +137,8 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
   }
 
   /**
-   * Removes this RedundancyLogger as a persistence listener from all the region's bucket advisors.
+   * Removes this PersistentBucketRecoverer as a persistence listener from all the region's bucket
+   * advisors.
    */
   private void removeListeners() {
     for (RegionStatus region : regions) {
@@ -196,7 +201,7 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
    */
   private class RegionStatus {
     /**
-     * The persistent identifier of the member running this RedundancyLogger.
+     * The persistent identifier of the member running this PersistentBucketRecoverer.
      */
     private final PersistentMemberID thisMember;
 
@@ -223,13 +228,13 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
 
     public void removeListeners() {
       for (ProxyBucketRegion proxyBucket : this.bucketRegions) {
-        proxyBucket.getPersistenceAdvisor().removeListener(RedundancyLogger.this);
+        proxyBucket.getPersistenceAdvisor().removeListener(PersistentBucketRecoverer.this);
       }
     }
 
     public void addListeners() {
       for (ProxyBucketRegion proxyBucket : this.bucketRegions) {
-        proxyBucket.getPersistenceAdvisor().addListener(RedundancyLogger.this);
+        proxyBucket.getPersistenceAdvisor().addListener(PersistentBucketRecoverer.this);
       }
     }
 
@@ -337,7 +342,7 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
       Map<PersistentMemberID, Set<Integer>> allMembersToWaitFor = getMembersToWaitFor(false);
 
       boolean thereAreBucketsToBeRecovered =
-          (RedundancyLogger.this.allBucketsRecoveredFromDisk.getCount() > 0);
+          (PersistentBucketRecoverer.this.allBucketsRecoveredFromDisk.getCount() > 0);
 
       /*
        * Log any offline members the region is waiting for.
@@ -401,4 +406,8 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
       return allWaitingBuckets;
     }
   }
+
+  public CountDownLatch getAllBucketsRecoveredFromDiskLatch() {
+    return allBucketsRecoveredFromDisk;
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
index 5c393ff..2065987 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
@@ -14,24 +14,19 @@
  */
 package org.apache.geode.internal.cache;
 
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.catchThrowable;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.util.concurrent.CountDownLatch;
 
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.InOrder;
 
-import org.apache.geode.internal.cache.partitioned.RedundancyLogger;
-import org.apache.geode.internal.logging.LoggingThread;
+import org.apache.geode.internal.cache.partitioned.PersistentBucketRecoverer;
 
 
 public class PRHARedundancyProviderTest {
@@ -44,41 +39,25 @@ public class PRHARedundancyProviderTest {
   }
 
   @Test
-  public void waitForPersistentBucketRecoveryProceedsWhenAllBucketsRecoveredFromDiskLatchIsNull() {
+  public void waitForPersistentBucketRecoveryProceedsWhenPersistentBucketRecovererLatchIsNotSet() {
+    PersistentBucketRecoverer recoverer = mock(PersistentBucketRecoverer.class);
+    doReturn(recoverer).when(provider).getPersistentBucketRecoverer();
+
     provider.waitForPersistentBucketRecovery();
   }
 
   @Test
   public void waitForPersistentBucketRecoveryProceedsAfterLatchCountDown() throws Exception {
-    provider.allBucketsRecoveredFromDisk = spy(new CountDownLatch(1));
-    provider.allBucketsRecoveredFromDisk.countDown();
+    PersistentBucketRecoverer recoverer = mock(PersistentBucketRecoverer.class);
+    doReturn(recoverer).when(provider).getPersistentBucketRecoverer();
+    CountDownLatch latch = spy(new CountDownLatch(1));
+    when(recoverer.getAllBucketsRecoveredFromDiskLatch()).thenReturn(latch);
+    latch.countDown();
 
     provider.waitForPersistentBucketRecovery();
 
-    verify(provider.allBucketsRecoveredFromDisk).await();
+    verify(latch).await();
   }
 
-  @Test
-  public void allBucketsRecoveredFromDiskCountDownLatchIsNotSetIfFailedToCreateRedundancyLogger() {
-    doThrow(new RuntimeException()).when(provider).createRedundancyLogger();
-
-    Throwable thrown =
-        catchThrowable(() -> provider.createRedundancyLoggerAndStartLoggingThread(1));
-
-    assertThat(provider.allBucketsRecoveredFromDisk).isNull();
-    assertThat(thrown).isInstanceOf(RuntimeException.class);
-  }
 
-  @Test
-  public void allBucketsRecoveredFromDiskIsSetBeforeLoggingThreadStarts() {
-    LoggingThread thread = mock(LoggingThread.class);
-    doReturn(mock(RedundancyLogger.class)).when(provider).createRedundancyLogger();
-    doReturn(thread).when(provider).createRedundancyLoggingThread();
-
-    provider.createRedundancyLoggerAndStartLoggingThread(1);
-
-    InOrder inOrder = inOrder(provider, thread);
-    inOrder.verify(provider).createAllBucketsRecoveredFromDisk(1);
-    inOrder.verify(thread).start();
-  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java
new file mode 100644
index 0000000..c2afc3a
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+
+import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PRHARedundancyProvider;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+
+public class PersistentBucketRecovererTest {
+  @Test
+  public void allBucketsRecoveredFromDiskCountDownLatchIsSet() {
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
+    InternalCache cache = mock(InternalCache.class);
+    DistributedRegion root = mock(DistributedRegion.class);
+    when(partitionedRegion.getCache()).thenReturn(cache);
+    when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true)).thenReturn(root);
+    PRHARedundancyProvider provider = new PRHARedundancyProvider(partitionedRegion);
+    int numberOfProxyBuckets = 5;
+
+    PersistentBucketRecoverer recoverer =
+        new PersistentBucketRecoverer(provider, numberOfProxyBuckets);
+
+    assertThat(recoverer.getAllBucketsRecoveredFromDiskLatch()).isNotNull();
+    assertThat(recoverer.getAllBucketsRecoveredFromDiskLatch().getCount())
+        .isEqualTo(numberOfProxyBuckets);
+  }
+}