You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2019/04/18 19:59:46 UTC
[geode] branch develop updated: GEODE-6630: move
allBucketsRecoveredFromDisk count down latch (#3477)
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 49757e7 GEODE-6630: move allBucketsRecoveredFromDisk count down latch (#3477)
49757e7 is described below
commit 49757e7e3706854c0775180d47b3d2825579f3f3
Author: pivotal-eshu <es...@pivotal.io>
AuthorDate: Thu Apr 18 12:59:30 2019 -0700
GEODE-6630: move allBucketsRecoveredFromDisk count down latch (#3477)
* Rename RedundancyLogger to PersistentBucketRecoverer
* Move allBucketsRecoveredFromDisk count down latch from PRHARedundancyProvider to PersistentBucketRecoverer.
* Provide utility methods for using the count down latch.
---
.../internal/cache/PRHARedundancyProvider.java | 90 ++++++-----------
...yLogger.java => PersistentBucketRecoverer.java} | 111 +++++++++++++++++----
.../internal/cache/PRHARedundancyProviderTest.java | 22 ++--
.../partitioned/PersistentBucketRecovererTest.java | 69 +++++++++++++
4 files changed, 205 insertions(+), 87 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
index 2942154..26c3406 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -75,8 +74,8 @@ import org.apache.geode.internal.cache.partitioned.PRLoad;
import org.apache.geode.internal.cache.partitioned.PartitionMemberInfoImpl;
import org.apache.geode.internal.cache.partitioned.PartitionRegionInfoImpl;
import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOp;
+import org.apache.geode.internal.cache.partitioned.PersistentBucketRecoverer;
import org.apache.geode.internal.cache.partitioned.RecoveryRunnable;
-import org.apache.geode.internal.cache.partitioned.RedundancyLogger;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor.PartitionProfile;
import org.apache.geode.internal.cache.partitioned.rebalance.CompositeDirector;
@@ -144,12 +143,10 @@ public class PRHARedundancyProvider {
private final Object shutdownLock = new Object();
private boolean shutdown = false;
- volatile CountDownLatch allBucketsRecoveredFromDisk;
-
/**
* Used to consolidate logging for bucket regions waiting on other members to come online.
*/
- private RedundancyLogger redundancyLogger = null;
+ private volatile PersistentBucketRecoverer persistentBucketRecoverer = null;
/**
* Constructor for PRHARedundancyProvider.
@@ -1685,6 +1682,10 @@ public class PRHARedundancyProvider {
final ProxyBucketRegion[] proxyBucketArray =
persistentLeader.getRegionAdvisor().getProxyBucketArray();
+ if (proxyBucketArray.length == 0) {
+ throw new IllegalStateException("Unexpected empty proxy bucket array");
+ }
+
for (ProxyBucketRegion proxyBucket : proxyBucketArray) {
proxyBucket.initializePersistenceAdvisor();
}
@@ -1707,14 +1708,9 @@ public class PRHARedundancyProvider {
ArrayList<ProxyBucketRegion> bucketsHostedLocally =
new ArrayList<ProxyBucketRegion>(proxyBucketArray.length);
+ createPersistentBucketRecoverer(proxyBucketArray.length);
/*
- * Start the redundancy logger before recovering any proxy buckets.
- */
- startRedundancyLogger(proxyBucketArray.length);
-
- allBucketsRecoveredFromDisk = new CountDownLatch(proxyBucketArray.length);
- /*
* Spawn a separate thread for bucket that we previously hosted to recover that bucket.
*
* That thread will get to the point at which it has determined that at least one member
@@ -1734,7 +1730,6 @@ public class PRHARedundancyProvider {
if (proxyBucket.getPersistenceAdvisor().wasHosting()) {
final RecoveryRunnable recoveryRunnable = new RecoveryRunnable(this) {
-
@Override
public void run() {
// Fix for 44551 - make sure that we always count down
@@ -1742,7 +1737,9 @@ public class PRHARedundancyProvider {
try {
super.run();
} finally {
- allBucketsRecoveredFromDisk.countDown();
+ if (getPersistentBucketRecoverer() != null) {
+ getPersistentBucketRecoverer().countDown();
+ }
}
}
@@ -1773,8 +1770,8 @@ public class PRHARedundancyProvider {
proxyBucket.recoverFromDiskRecursively();
}
} finally {
- for (final ProxyBucketRegion proxyBucket : bucketsNotHostedLocally) {
- allBucketsRecoveredFromDisk.countDown();
+ if (getPersistentBucketRecoverer() != null) {
+ getPersistentBucketRecoverer().countDown(bucketsNotHostedLocally.size());
}
}
@@ -1784,13 +1781,13 @@ public class PRHARedundancyProvider {
// }
}
- void startRedundancyLogger(int proxyBuckets) {
- if (proxyBuckets > 0) {
- redundancyLogger = new RedundancyLogger(this);
- Thread loggingThread = new LoggingThread(
- "RedundancyLogger for region " + this.prRegion.getName(), false, this.redundancyLogger);
- loggingThread.start();
- }
+ private void createPersistentBucketRecoverer(int proxyBuckets) {
+ persistentBucketRecoverer = new PersistentBucketRecoverer(this, proxyBuckets);
+ persistentBucketRecoverer.startLoggingThread();
+ }
+
+ PersistentBucketRecoverer getPersistentBucketRecoverer() {
+ return persistentBucketRecoverer;
}
/**
@@ -1981,24 +1978,9 @@ public class PRHARedundancyProvider {
* whichever happens first.
*/
protected void waitForPersistentBucketRecoveryOrClose() {
- CountDownLatch recoveryLatch = allBucketsRecoveredFromDisk;
- if (recoveryLatch != null) {
- boolean interrupted = false;
- while (true) {
- try {
- this.prRegion.getCancelCriterion().checkCancelInProgress(null);
- boolean done = recoveryLatch.await(
- PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION, TimeUnit.MILLISECONDS);
- if (done) {
- break;
- }
- } catch (InterruptedException e) {
- interrupted = true;
- }
- }
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
+ if (getPersistentBucketRecoverer() != null) {
+ getPersistentBucketRecoverer().await(
+ PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION, TimeUnit.MILLISECONDS);
}
List<PartitionedRegion> colocatedRegions =
@@ -2013,20 +1995,8 @@ public class PRHARedundancyProvider {
* currently being closed.
*/
protected void waitForPersistentBucketRecovery() {
- CountDownLatch recoveryLatch = allBucketsRecoveredFromDisk;
- if (recoveryLatch != null) {
- boolean interrupted = false;
- while (true) {
- try {
- recoveryLatch.await();
- break;
- } catch (InterruptedException e) {
- interrupted = true;
- }
- }
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
+ if (getPersistentBucketRecoverer() != null) {
+ getPersistentBucketRecoverer().await();
}
}
@@ -2035,7 +2005,8 @@ public class PRHARedundancyProvider {
return false;
}
- if (allBucketsRecoveredFromDisk != null && allBucketsRecoveredFromDisk.getCount() > 0) {
+ if (getPersistentBucketRecoverer() != null
+ && !getPersistentBucketRecoverer().hasRecoveryCompleted()) {
return false;
}
@@ -2044,12 +2015,11 @@ public class PRHARedundancyProvider {
for (PartitionedRegion region : colocatedRegions.values()) {
PRHARedundancyProvider redundancyProvider = region.getRedundancyProvider();
- if (redundancyProvider.allBucketsRecoveredFromDisk != null
- && redundancyProvider.allBucketsRecoveredFromDisk.getCount() > 0) {
+ if (redundancyProvider.getPersistentBucketRecoverer() != null &&
+ !redundancyProvider.getPersistentBucketRecoverer().hasRecoveryCompleted()) {
return false;
}
}
-
return true;
}
@@ -2323,10 +2293,6 @@ public class PRHARedundancyProvider {
}
}
- public CountDownLatch getAllBucketsRecoveredFromDiskLatch() {
- return allBucketsRecoveredFromDisk;
- }
-
private ThreadsMonitoring getThreadMonitorObj() {
DistributionManager distributionManager = this.prRegion.getDistributionManager();
if (distributionManager != null) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RedundancyLogger.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecoverer.java
similarity index 81%
rename from geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RedundancyLogger.java
rename to geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecoverer.java
index f8b98ac..5654c61 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RedundancyLogger.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecoverer.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.Logger;
@@ -38,17 +39,17 @@ import org.apache.geode.internal.cache.ProxyBucketRegion;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.persistence.PersistentStateListener;
import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.logging.LoggingThread;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.process.StartupStatus;
import org.apache.geode.internal.util.TransformUtils;
/**
* Consolidates logging during the recovery of ProxyRegionBuckets that are not hosted by this
- * member. This logger is meant to run in its own thread and utilizes the PRHARedundancyProvider's
- * count down latch in order to determine when it is finished.
- *
+ * member. The logger is meant to run in its own thread.
+ * It uses a count down latch to determine whether the recovery is finished.
*/
-public class RedundancyLogger extends RecoveryRunnable implements PersistentStateListener {
+public class PersistentBucketRecoverer extends RecoveryRunnable implements PersistentStateListener {
private static final Logger logger = LogService.getLogger();
@@ -71,14 +72,15 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
/**
- * Creates a new RedundancyLogger.
+ * Creates a new PersistentBucketRecoverer.
*
*/
- public RedundancyLogger(PRHARedundancyProvider prhaRedundancyProvider) {
+ public PersistentBucketRecoverer(PRHARedundancyProvider prhaRedundancyProvider,
+ int proxyBuckets) {
super(prhaRedundancyProvider);
PartitionedRegion baseRegion = ColocationHelper.getLeaderRegion(redundancyProvider.prRegion);
List<PartitionedRegion> colocatedRegions =
- ColocationHelper.getColocatedChildRegions(baseRegion);
+ getColocatedChildRegions(baseRegion);
List<RegionStatus> allRegions = new ArrayList<RegionStatus>(colocatedRegions.size() + 1);
if (baseRegion.getDataPolicy().withPersistence()) {
allRegions.add(new RegionStatus(baseRegion));
@@ -89,12 +91,22 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
}
}
- this.regions = Collections.unmodifiableList(allRegions);
+ regions = Collections.unmodifiableList(allRegions);
+ allBucketsRecoveredFromDisk = new CountDownLatch(proxyBuckets);
+ membershipChanged = true;
+ addListeners();
+ }
- this.allBucketsRecoveredFromDisk = redundancyProvider.getAllBucketsRecoveredFromDiskLatch();
- this.membershipChanged = true;
- addListeners();
+ List<PartitionedRegion> getColocatedChildRegions(PartitionedRegion baseRegion) {
+ return ColocationHelper.getColocatedChildRegions(baseRegion);
+ }
+
+ public void startLoggingThread() {
+ Thread loggingThread = new LoggingThread(
+ "PersistentBucketRecoverer for region " + redundancyProvider.prRegion.getName(), false,
+ this);
+ loggingThread.start();
}
/**
@@ -124,7 +136,8 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
/**
- * Add this RedundancyLogger as a persistence listener to all the region's bucket advisors.
+ * Add this PersistentBucketRecoverer as a persistence listener to all the region's bucket
+ * advisors.
*/
private void addListeners() {
for (RegionStatus region : regions) {
@@ -133,7 +146,8 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
}
/**
- * Removes this RedundancyLogger as a persistence listener from all the region's bucket advisors.
+ * Removes this PersistentBucketRecoverer as a persistence listener from all the region's bucket
+ * advisors.
*/
private void removeListeners() {
for (RegionStatus region : regions) {
@@ -151,7 +165,7 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
public void run2() {
try {
boolean warningLogged = false;
- while (this.allBucketsRecoveredFromDisk.getCount() > 0) {
+ while (getLatchCount() > 0) {
int sleepMillis = SLEEP_PERIOD;
// reduce the first log time from 15secs so that higher layers can
// report sooner to user
@@ -196,7 +210,7 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
*/
private class RegionStatus {
/**
- * The persistent identifier of the member running this RedundancyLogger.
+ * The persistent identifier of the member running this PersistentBucketRecoverer.
*/
private final PersistentMemberID thisMember;
@@ -223,13 +237,13 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
public void removeListeners() {
for (ProxyBucketRegion proxyBucket : this.bucketRegions) {
- proxyBucket.getPersistenceAdvisor().removeListener(RedundancyLogger.this);
+ proxyBucket.getPersistenceAdvisor().removeListener(PersistentBucketRecoverer.this);
}
}
public void addListeners() {
for (ProxyBucketRegion proxyBucket : this.bucketRegions) {
- proxyBucket.getPersistenceAdvisor().addListener(RedundancyLogger.this);
+ proxyBucket.getPersistenceAdvisor().addListener(PersistentBucketRecoverer.this);
}
}
@@ -336,8 +350,7 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
Map<PersistentMemberID, Set<Integer>> offlineMembers = getMembersToWaitFor(true);
Map<PersistentMemberID, Set<Integer>> allMembersToWaitFor = getMembersToWaitFor(false);
- boolean thereAreBucketsToBeRecovered =
- (RedundancyLogger.this.allBucketsRecoveredFromDisk.getCount() > 0);
+ boolean thereAreBucketsToBeRecovered = (getLatchCount() > 0);
/*
* Log any offline members the region is waiting for.
@@ -401,4 +414,64 @@ public class RedundancyLogger extends RecoveryRunnable implements PersistentStat
return allWaitingBuckets;
}
}
+
+ public void await(long timeout, TimeUnit unit) {
+ boolean interrupted = false;
+ while (true) {
+ try {
+ redundancyProvider.prRegion.getCancelCriterion().checkCancelInProgress(null);
+ boolean done = allBucketsRecoveredFromDisk.await(timeout, unit);
+ if (done) {
+ break;
+ }
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void await() {
+ boolean interrupted = false;
+ while (true) {
+ try {
+ getAllBucketsRecoveredFromDiskLatch().await();
+ break;
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void countDown() {
+ allBucketsRecoveredFromDisk.countDown();
+ }
+
+ public void countDown(int size) {
+ while (size > 0) {
+ allBucketsRecoveredFromDisk.countDown();
+ --size;
+ }
+ }
+
+ public boolean hasRecoveryCompleted() {
+ if (getLatchCount() > 0) {
+ return false;
+ }
+ return true;
+ }
+
+ long getLatchCount() {
+ return allBucketsRecoveredFromDisk.getCount();
+ }
+
+ CountDownLatch getAllBucketsRecoveredFromDiskLatch() {
+ return allBucketsRecoveredFromDisk;
+ }
+
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
index e650cbc..c5fcff5 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
@@ -15,15 +15,17 @@
package org.apache.geode.internal.cache;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
-
-import java.util.concurrent.CountDownLatch;
+import static org.mockito.Mockito.when;
import org.junit.Before;
import org.junit.Test;
+import org.apache.geode.internal.cache.partitioned.PersistentBucketRecoverer;
+
public class PRHARedundancyProviderTest {
private PRHARedundancyProvider provider;
@@ -31,21 +33,29 @@ public class PRHARedundancyProviderTest {
@Before
public void setup() {
PartitionedRegion partitionedRegion = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
+ InternalCache cache = mock(InternalCache.class);
+ DistributedRegion root = mock(DistributedRegion.class);
+ when(partitionedRegion.getCache()).thenReturn(cache);
+ when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true)).thenReturn(root);
provider = spy(new PRHARedundancyProvider(partitionedRegion));
}
@Test
- public void waitForPersistentBucketRecoveryProceedsWhenAllBucketsRecoveredFromDiskLatchIsNull() {
+ public void waitForPersistentBucketRecoveryProceedsWhenPersistentBucketRecovererLatchIsNotSet() {
+ PersistentBucketRecoverer recoverer = mock(PersistentBucketRecoverer.class);
+ doReturn(recoverer).when(provider).getPersistentBucketRecoverer();
+
provider.waitForPersistentBucketRecovery();
}
@Test
public void waitForPersistentBucketRecoveryProceedsAfterLatchCountDown() throws Exception {
- provider.allBucketsRecoveredFromDisk = spy(new CountDownLatch(1));
- provider.allBucketsRecoveredFromDisk.countDown();
+ PersistentBucketRecoverer recoverer = spy(new PersistentBucketRecoverer(provider, 1));
+ doReturn(recoverer).when(provider).getPersistentBucketRecoverer();
+ provider.getPersistentBucketRecoverer().countDown();
provider.waitForPersistentBucketRecovery();
- verify(provider.allBucketsRecoveredFromDisk).await();
+ verify(recoverer).await();
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java
new file mode 100644
index 0000000..12168cd
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PersistentBucketRecovererTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PRHARedundancyProvider;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+
+public class PersistentBucketRecovererTest {
+ private PartitionedRegion partitionedRegion;
+ private InternalCache cache;
+ private DistributedRegion root;
+ private PRHARedundancyProvider provider;
+
+ @Before
+ public void setUp() {
+ partitionedRegion = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
+ cache = mock(InternalCache.class);
+ root = mock(DistributedRegion.class);
+ when(partitionedRegion.getCache()).thenReturn(cache);
+ when(cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true)).thenReturn(root);
+ provider = new PRHARedundancyProvider(partitionedRegion);
+ }
+
+ @Test
+ public void allBucketsRecoveredFromDiskCountDownLatchIsSet() {
+ int numberOfProxyBuckets = 5;
+
+ PersistentBucketRecoverer recoverer =
+ new PersistentBucketRecoverer(provider, numberOfProxyBuckets);
+
+ assertThat(recoverer.getAllBucketsRecoveredFromDiskLatch()).isNotNull();
+ assertThat(recoverer.getLatchCount()).isEqualTo(numberOfProxyBuckets);
+ }
+
+ @Test
+ public void latchCanBeCountedDown() {
+ int numberOfProxyBuckets = 5;
+ PersistentBucketRecoverer recoverer =
+ new PersistentBucketRecoverer(provider, numberOfProxyBuckets);
+
+ assertThat(recoverer.getLatchCount()).isEqualTo(numberOfProxyBuckets);
+ recoverer.countDown(numberOfProxyBuckets);
+
+ recoverer.await();
+ }
+}