You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2019/04/17 22:29:18 UTC
[geode] branch feature/GEODE-6630 updated: fix a review comment.
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-6630
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-6630 by this push:
new 7edf667 fix a review comment.
7edf667 is described below
commit 7edf667dde099c4de77622fdafe28d28e366b974
Author: eshu <es...@pivotal.io>
AuthorDate: Wed Apr 17 15:28:09 2019 -0700
fix a review comment.
---
.../internal/cache/PRHARedundancyProvider.java | 59 ++++++++++------------
...yLogger.java => PersistentBucketRecoverer.java} | 39 ++++++++------
.../internal/cache/PRHARedundancyProviderTest.java | 45 +++++------------
.../partitioned/PersistentBucketRecovererTest.java | 48 ++++++++++++++++++
4 files changed, 112 insertions(+), 79 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
index f519914..7f951fb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
@@ -75,8 +75,8 @@ import org.apache.geode.internal.cache.partitioned.PRLoad;
import org.apache.geode.internal.cache.partitioned.PartitionMemberInfoImpl;
import org.apache.geode.internal.cache.partitioned.PartitionRegionInfoImpl;
import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOp;
+import org.apache.geode.internal.cache.partitioned.PersistentBucketRecoverer;
import org.apache.geode.internal.cache.partitioned.RecoveryRunnable;
-import org.apache.geode.internal.cache.partitioned.RedundancyLogger;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor.PartitionProfile;
import org.apache.geode.internal.cache.partitioned.rebalance.CompositeDirector;
@@ -144,12 +144,10 @@ public class PRHARedundancyProvider {
private final Object shutdownLock = new Object();
private boolean shutdown = false;
- volatile CountDownLatch allBucketsRecoveredFromDisk;
-
/**
* Used to consolidate logging for bucket regions waiting on other members to come online.
*/
- private RedundancyLogger redundancyLogger = null;
+ private volatile PersistentBucketRecoverer persistentBucketRecoverer = null;
/**
* Constructor for PRHARedundancyProvider.
@@ -1715,7 +1713,11 @@ public class PRHARedundancyProvider {
/*
* Start the redundancy logger before recovering any proxy buckets.
*/
- createRedundancyLoggerAndStartLoggingThread(proxyBucketArray.length);
+ persistentBucketRecoverer = createPersistentBucketRecoverer(proxyBucketArray.length);
+ Thread loggingThread = new LoggingThread(
+ "PersistentBucketRecoverer for region " + prRegion.getName(), false,
+ persistentBucketRecoverer);
+ loggingThread.start();
/*
* Spawn a separate thread for bucket that we previously hosted to recover that bucket.
@@ -1745,7 +1747,10 @@ public class PRHARedundancyProvider {
try {
super.run();
} finally {
- allBucketsRecoveredFromDisk.countDown();
+ CountDownLatch recoveryLatch = getRecoveryLatch();
+ if (recoveryLatch != null) {
+ recoveryLatch.countDown();
+ }
}
}
@@ -1777,7 +1782,10 @@ public class PRHARedundancyProvider {
}
} finally {
for (final ProxyBucketRegion proxyBucket : bucketsNotHostedLocally) {
- allBucketsRecoveredFromDisk.countDown();
+ CountDownLatch recoveryLatch = getRecoveryLatch();
+ if (recoveryLatch != null) {
+ recoveryLatch.countDown();
+ }
}
}
@@ -1787,24 +1795,17 @@ public class PRHARedundancyProvider {
// }
}
- void createRedundancyLoggerAndStartLoggingThread(int proxyBuckets) {
- redundancyLogger = createRedundancyLogger();
- allBucketsRecoveredFromDisk = createAllBucketsRecoveredFromDisk(proxyBuckets);
- Thread loggingThread = createRedundancyLoggingThread();
- loggingThread.start();
- }
-
- LoggingThread createRedundancyLoggingThread() {
- return new LoggingThread(
- "RedundancyLogger for region " + prRegion.getName(), false, redundancyLogger);
+ CountDownLatch getRecoveryLatch() {
+ return getPersistentBucketRecoverer() == null ? null
+ : getPersistentBucketRecoverer().getAllBucketsRecoveredFromDiskLatch();
}
- CountDownLatch createAllBucketsRecoveredFromDisk(int proxyBuckets) {
- return new CountDownLatch(proxyBuckets);
+ private PersistentBucketRecoverer createPersistentBucketRecoverer(int proxyBuckets) {
+ return new PersistentBucketRecoverer(this, proxyBuckets);
}
- RedundancyLogger createRedundancyLogger() {
- return new RedundancyLogger(this);
+ PersistentBucketRecoverer getPersistentBucketRecoverer() {
+ return persistentBucketRecoverer;
}
/**
@@ -1995,7 +1996,7 @@ public class PRHARedundancyProvider {
* whichever happens first.
*/
protected void waitForPersistentBucketRecoveryOrClose() {
- CountDownLatch recoveryLatch = allBucketsRecoveredFromDisk;
+ CountDownLatch recoveryLatch = getRecoveryLatch();
if (recoveryLatch != null) {
boolean interrupted = false;
while (true) {
@@ -2027,7 +2028,7 @@ public class PRHARedundancyProvider {
* currently being closed.
*/
protected void waitForPersistentBucketRecovery() {
- CountDownLatch recoveryLatch = allBucketsRecoveredFromDisk;
+ CountDownLatch recoveryLatch = getRecoveryLatch();
if (recoveryLatch != null) {
boolean interrupted = false;
while (true) {
@@ -2048,8 +2049,8 @@ public class PRHARedundancyProvider {
if (!ColocationHelper.checkMembersColocation(this.prRegion, this.prRegion.getMyId())) {
return false;
}
-
- if (allBucketsRecoveredFromDisk != null && allBucketsRecoveredFromDisk.getCount() > 0) {
+ CountDownLatch recoveryLatch = getRecoveryLatch();
+ if (recoveryLatch != null && recoveryLatch.getCount() > 0) {
return false;
}
@@ -2058,8 +2059,8 @@ public class PRHARedundancyProvider {
for (PartitionedRegion region : colocatedRegions.values()) {
PRHARedundancyProvider redundancyProvider = region.getRedundancyProvider();
- if (redundancyProvider.allBucketsRecoveredFromDisk != null
- && redundancyProvider.allBucketsRecoveredFromDisk.getCount() > 0) {
+ recoveryLatch = redundancyProvider.getRecoveryLatch();
+ if (recoveryLatch != null && recoveryLatch.getCount() > 0) {
return false;
}
}
@@ -2337,10 +2338,6 @@ public class PRHARedundancyProvider {
}
}
- public CountDownLatch getAllBucketsRecoveredFromDiskLatch() {
- return allBucketsRecoveredFromDisk;
- }
-
private ThreadsMonitoring getThreadMonitorObj() {
DistributionManager distributionManager = this.prRegion.getDistributionManager();
if (distributionManager != null) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RedundancyLogger.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecoverer.java
similarity index 91%
rename from geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RedundancyLogger.java
rename to geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecoverer.java
index f8b98ac..54e2130 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RedundancyLogger.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecoverer.java
@@ -48,7 +48,7 @@ import org.apache.geode.internal.util.TransformUtils;
* count down latch in order to determine when it is finished.
*
*/
-public class RedundancyLogger extends RecoveryRunnable implements PersistentStateListener {
+public class PersistentBucketRecoverer extends RecoveryRunnable implements PersistentStateListener {
private static final Logger logger = LogService.getLogger();
@@ -71,14 +71,15 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
/**
- * Creates a new RedundancyLogger.
+ * Creates a new PersistentBucketRecoverer.
*
*/
- public RedundancyLogger(PRHARedundancyProvider prhaRedundancyProvider) {
+ public PersistentBucketRecoverer(PRHARedundancyProvider prhaRedundancyProvider,
+ int proxyBuckets) {
super(prhaRedundancyProvider);
PartitionedRegion baseRegion = ColocationHelper.getLeaderRegion(redundancyProvider.prRegion);
List<PartitionedRegion> colocatedRegions =
- ColocationHelper.getColocatedChildRegions(baseRegion);
+ getColocatedChildRegions(baseRegion);
List<RegionStatus> allRegions = new ArrayList<RegionStatus>(colocatedRegions.size() + 1);
if (baseRegion.getDataPolicy().withPersistence()) {
allRegions.add(new RegionStatus(baseRegion));
@@ -89,14 +90,16 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
}
}
- this.regions = Collections.unmodifiableList(allRegions);
-
-
- this.allBucketsRecoveredFromDisk = redundancyProvider.getAllBucketsRecoveredFromDiskLatch();
- this.membershipChanged = true;
+ regions = Collections.unmodifiableList(allRegions);
+ allBucketsRecoveredFromDisk = new CountDownLatch(proxyBuckets);
+ membershipChanged = true;
addListeners();
}
+ List<PartitionedRegion> getColocatedChildRegions(PartitionedRegion baseRegion) {
+ return ColocationHelper.getColocatedChildRegions(baseRegion);
+ }
+
/**
* Called when a member comes online for a bucket.
*/
@@ -124,7 +127,8 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
/**
- * Add this RedundancyLogger as a persistence listener to all the region's bucket advisors.
+ * Add this PersistentBucketRecoverer as a persistence listener to all the region's bucket
+ * advisors.
*/
private void addListeners() {
for (RegionStatus region : regions) {
@@ -133,7 +137,8 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
}
/**
- * Removes this RedundancyLogger as a persistence listener from all the region's bucket advisors.
+ * Removes this PersistentBucketRecoverer as a persistence listener from all the region's bucket
+ * advisors.
*/
private void removeListeners() {
for (RegionStatus region : regions) {
@@ -196,7 +201,7 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
*/
private class RegionStatus {
/**
- * The persistent identifier of the member running this RedundancyLogger.
+ * The persistent identifier of the member running this PersistentBucketRecoverer.
*/
private final PersistentMemberID thisMember;
@@ -223,13 +228,13 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
public void removeListeners() {
for (ProxyBucketRegion proxyBucket : this.bucketRegions) {
- proxyBucket.getPersistenceAdvisor().removeListener(RedundancyLogger.this);
+ proxyBucket.getPersistenceAdvisor().removeListener(PersistentBucketRecoverer.this);
}
}
public void addListeners() {
for (ProxyBucketRegion proxyBucket : this.bucketRegions) {
- proxyBucket.getPersistenceAdvisor().addListener(RedundancyLogger.this);
+ proxyBucket.getPersistenceAdvisor().addListener(PersistentBucketRecoverer.this);
}
}
@@ -337,7 +342,7 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
Map<PersistentMemberID, Set<Integer>> allMembersToWaitFor = getMembersToWaitFor(false);
boolean thereAreBucketsToBeRecovered =
- (RedundancyLogger.this.allBucketsRecoveredFromDisk.getCount() > 0);
+ (PersistentBucketRecoverer.this.allBucketsRecoveredFromDisk.getCount() > 0);
/*
* Log any offline members the region is waiting for.
@@ -401,4 +406,8 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
return allWaitingBuckets;
}
}
+
+ public CountDownLatch getAllBucketsRecoveredFromDiskLatch() {
+ return allBucketsRecoveredFromDisk;
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
index 5c393ff..2065987 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
@@ -14,24 +14,19 @@
*/
package org.apache.geode.internal.cache;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.util.concurrent.CountDownLatch;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.InOrder;
-import org.apache.geode.internal.cache.partitioned.RedundancyLogger;
-import org.apache.geode.internal.logging.LoggingThread;
+import org.apache.geode.internal.cache.partitioned.PersistentBucketRecoverer;
public class PRHARedundancyProviderTest {
@@ -44,41 +39,25 @@ public class PRHARedundancyProviderTest {
}
@Test
- public void waitForPersistentBucketRecoveryProceedsWhenAllBucketsRecoveredFromDiskLatchIsNull() {
+ public void waitForPersistentBucketRecoveryProceedsWhenPersistentBucketRecovererLatchIsNotSet() {
+ PersistentBucketRecoverer recoverer = mock(PersistentBucketRecoverer.class);
+ doReturn(recoverer).when(provider).getPersistentBucketRecoverer();
+
provider.waitForPersistentBucketRecovery();
}
@Test
public void waitForPersistentBucketRecoveryProceedsAfterLatchCountDown() throws Exception {
- provider.allBucketsRecoveredFromDisk = spy(new CountDownLatch(1));
- provider.allBucketsRecoveredFromDisk.countDown();
+ PersistentBucketRecoverer recoverer = mock(PersistentBucketRecoverer.class);
+ doReturn(recoverer).when(provider).getPersistentBucketRecoverer();
+ CountDownLatch latch = spy(new CountDownLatch(1));
+ when(recoverer.getAllBucketsRecoveredFromDiskLatch()).thenReturn(latch);
+ latch.countDown();
provider.waitForPersistentBucketRecovery();
- verify(provider.allBucketsRecoveredFromDisk).await();
+ verify(latch).await();
}
- @Test
- public void allBucketsRecoveredFromDiskCountDownLatchIsNotSetIfFailedToCreateRedundancyLogger() {
- doThrow(new RuntimeException()).when(provider).createRedundancyLogger();
-
- Throwable thrown =
- catchThrowable(() -> provider.createRedundancyLoggerAndStartLoggingThread(1));
-
- assertThat(provider.allBucketsRecoveredFromDisk).isNull();
- assertThat(thrown).isInstanceOf(RuntimeException.class);
- }
- @Test
- public void allBucketsRecoveredFromDiskIsSetBeforeLoggingThreadStarts() {
- LoggingThread thread = mock(LoggingThread.class);
- doReturn(mock(RedundancyLogger.class)).when(provider).createRedundancyLogger();
- doReturn(thread).when(provider).createRedundancyLoggingThread();
-
- provider.createRedundancyLoggerAndStartLoggingThread(1);
-
- InOrder inOrder = inOrder(provider, thread);
- inOrder.verify(provider).createAllBucketsRecoveredFromDisk(1);
- inOrder.verify(thread).start();
- }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java
new file mode 100644
index 0000000..c2afc3a
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+
+import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PRHARedundancyProvider;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+
+public class PersistentBucketRecovererTest {
+ @Test
+ public void allBucketsRecoveredFromDiskCountDownLatchIsSet() {
+ PartitionedRegion partitionedRegion = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
+ InternalCache cache = mock(InternalCache.class);
+ DistributedRegion root = mock(DistributedRegion.class);
+ when(partitionedRegion.getCache()).thenReturn(cache);
+ when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true)).thenReturn(root);
+ PRHARedundancyProvider provider = new PRHARedundancyProvider(partitionedRegion);
+ int numberOfProxyBuckets = 5;
+
+ PersistentBucketRecoverer recoverer =
+ new PersistentBucketRecoverer(provider, numberOfProxyBuckets);
+
+ assertThat(recoverer.getAllBucketsRecoveredFromDiskLatch()).isNotNull();
+ assertThat(recoverer.getAllBucketsRecoveredFromDiskLatch().getCount())
+ .isEqualTo(numberOfProxyBuckets);
+ }
+}