You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jm...@apache.org on 2022/02/23 18:32:56 UTC
[geode] branch support/1.14 updated: GEODE-9990: turn DiskAccessException into CacheClosedException (#7334) (#7379)
This is an automated email from the ASF dual-hosted git repository.
jmelchior pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.14 by this push:
new dede61d GEODE-9990: turn DiskAccessException into CacheClosedException (#7334) (#7379)
dede61d is described below
commit dede61d4813ecba9a51c7417214ea69092a88172
Author: Joris Melchior <jo...@gmail.com>
AuthorDate: Wed Feb 23 13:30:48 2022 -0500
GEODE-9990: turn DiskAccessException into CacheClosedException (#7334) (#7379)
* GEODE-9990: turn DiskAccessException into CacheClosedException
- when DiskInitFile is in closed state and DiskStoreImpl is closed or
closing
- catch DiskAccessException in PRHARedundancyProvider and turn into
CacheClosedException if cache closing is in progress
- change CreateBucketMessage to handle DiskAccessException as cause of
ReplyException
(cherry picked from commit a98197b5d0a3a2547e0581e475dcabaa82e6e92f)
---
.../internal/cache/DiskInitFileJUnitTest.java | 72 ++++++++++++++++++++++
.../apache/geode/internal/cache/DiskInitFile.java | 27 +++++---
.../internal/cache/PRHARedundancyProvider.java | 10 +++
.../cache/partitioned/CreateBucketMessage.java | 5 +-
.../internal/cache/PRHARedundancyProviderTest.java | 72 ++++++++++++++++++++++
5 files changed, 177 insertions(+), 9 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java
index 927616d..56b3e55 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java
@@ -15,10 +15,15 @@
package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
@@ -31,8 +36,11 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.apache.geode.CancelCriterion;
import org.apache.geode.Statistics;
import org.apache.geode.StatisticsFactory;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.internal.cache.persistence.DiskRegionView;
import org.apache.geode.internal.cache.persistence.DiskStoreID;
@@ -134,4 +142,68 @@ public class DiskInitFileJUnitTest {
assertThat(dif.hasKrf(2)).isFalse();
dif.destroy();
}
+
+ @Test
+ public void markInitializedThrowsDiskAccessExceptionWhenInitFileClosedAndParentAndCacheNotClosing() {
+ markInitializedTestSetup();
+
+ DiskInitFile diskInitFile =
+ new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet());
+ diskInitFile.close();
+
+ assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isInstanceOf(
+ DiskAccessException.class);
+ }
+
+ @Test
+ public void markInitializedThrowsCacheClosedExceptionWhenInitFileClosedAndParentIsClosedOrClosing() {
+ markInitializedTestSetup();
+ when(mockedDiskStoreImpl.isClosed()).thenReturn(Boolean.TRUE);
+
+ DiskInitFile diskInitFile =
+ new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet());
+ diskInitFile.close();
+
+ assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isInstanceOf(
+ CacheClosedException.class);
+ }
+
+ @Test
+ public void markInitializedThrowsCacheClosedExceptionWhenCacheIsClosing() {
+ CancelCriterion cancelCriterion = markInitializedTestSetup();
+ CacheClosedException cacheClosedException = new CacheClosedException("boom");
+ doThrow(cacheClosedException).when(cancelCriterion).checkCancelInProgress();
+
+ DiskInitFile diskInitFile =
+ new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet());
+ diskInitFile.close();
+
+ assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isEqualTo(
+ cacheClosedException);
+ }
+
+ @Test
+ public void markInitializedCacheCloseIsCalledWhenParentHandlesDiskAccessException() {
+ markInitializedTestSetup();
+
+ DiskInitFile diskInitFile =
+ new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet());
+ diskInitFile.close();
+
+ assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView))
+ .isInstanceOf(DiskAccessException.class);
+ verify(mockedDiskStoreImpl, times(1)).handleDiskAccessException(any(DiskAccessException.class));
+ }
+
+ private CancelCriterion markInitializedTestSetup() {
+ InternalCache internalCache = mock(InternalCache.class);
+ CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+ DiskRegion diskRegion = mock(DiskRegion.class);
+
+ when(mockedDiskStoreImpl.getCache()).thenReturn(internalCache);
+ when(mockedDiskStoreImpl.getById(anyLong())).thenReturn(diskRegion);
+ when(internalCache.getCancelCriterion()).thenReturn(cancelCriterion);
+
+ return cancelCriterion;
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java
index dd7b197..d3139ec 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java
@@ -55,6 +55,7 @@ import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.Instantiator;
+import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAlgorithm;
@@ -1391,11 +1392,24 @@ public class DiskInitFile implements DiskInitFileInterpreter {
writeIFRecord(bb, true);
}
+ private void checkClosed() {
+ if (closed) {
+ parent.getCache().getCancelCriterion().checkCancelInProgress();
+
+ if (parent.isClosed() || parent.isClosing()) {
+ throw new CacheClosedException("The disk store is closed or closing");
+ }
+
+ DiskAccessException dae = new DiskAccessException("The disk init file is closed", parent);
+ parent.handleDiskAccessException(dae);
+
+ throw dae;
+ }
+ }
+
private void writeIFRecord(ByteBuffer bb, boolean doStats) throws IOException {
assert lock.isHeldByCurrentThread();
- if (this.closed) {
- throw new DiskAccessException("The disk store is closed", parent);
- }
+ checkClosed();
this.ifRAF.write(bb.array(), 0, bb.position());
if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES_VERBOSE)) {
@@ -1411,10 +1425,9 @@ public class DiskInitFile implements DiskInitFileInterpreter {
private void writeIFRecord(HeapDataOutputStream hdos, boolean doStats) throws IOException {
assert lock.isHeldByCurrentThread();
- if (this.closed) {
- throw new DiskAccessException("The disk store is closed", parent);
- }
- hdos.sendTo(this.ifRAF);
+ checkClosed();
+
+ hdos.sendTo(ifRAF);
if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES_VERBOSE)) {
logger.trace(LogMarker.PERSIST_WRITES_VERBOSE, "DiskInitFile writeIFRecord HDOS");
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
index e07ab55..0e24e0f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
@@ -43,11 +43,13 @@ import java.util.function.BiFunction;
import org.apache.logging.log4j.Logger;
+import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
+import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.PartitionedRegionStorageException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
@@ -726,6 +728,14 @@ public class PRHARedundancyProvider {
return bucketPrimary;
}
}
+ } catch (DiskAccessException dae) {
+ CancelCriterion cancelCriterion = partitionedRegion.getCancelCriterion();
+ if (cancelCriterion.isCancelInProgress()) {
+ needToElectPrimary = false;
+ cancelCriterion.checkCancelInProgress(dae);
+ }
+
+ throw dae;
} catch (CancelException | RegionDestroyedException e) {
// We don't need to elect a primary if the cache was closed. The other members will
// take care of it. This ensures we don't compromise redundancy.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java
index e71715d..ae1caaf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java
@@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.PartitionedRegionStorageException;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
@@ -339,9 +340,9 @@ public class CreateBucketMessage extends PartitionMessage {
waitForRepliesUninterruptibly();
} catch (ReplyException e) {
Throwable t = e.getCause();
- if (t instanceof CancelException) {
+ if (t instanceof DiskAccessException || t instanceof CancelException) {
logger.debug(
- "NodeResponse got remote cancellation, throwing PartitionedRegionCommunication Exception {}",
+ "NodeResponse got remote exception, throwing PartitionedRegionCommunication Exception {}",
t.getMessage(), t);
return null;
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
index 0744a35..48352a7 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
@@ -16,16 +16,21 @@ package org.apache.geode.internal.cache;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
@@ -45,10 +50,13 @@ import org.mockito.stubbing.Answer;
import org.apache.geode.CancelCriterion;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.control.InternalResourceManager;
+import org.apache.geode.internal.cache.partitioned.Bucket;
import org.apache.geode.internal.cache.partitioned.InternalPRInfo;
import org.apache.geode.internal.cache.partitioned.LoadProbe;
import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOp;
@@ -248,6 +256,70 @@ public class PRHARedundancyProviderTest {
}
@Test
+ public void createBucketAtomicallyConvertsDiskAccessExceptionWhenCacheCloseInProgress() {
+ String partitionName = "partitionName";
+ DiskAccessException diskAccessException = new DiskAccessException("boom");
+ CacheClosedException cacheClosedException = new CacheClosedException(diskAccessException);
+ InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
+ Set<InternalDistributedMember> memberSet = Collections.singleton(internalDistributedMember);
+ InternalCache internalCache = mock(InternalCache.class);
+ RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+ Bucket bucket = mock(Bucket.class);
+ BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+ CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+
+ prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager);
+
+ when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor);
+ when(partitionedRegion.getCache()).thenReturn(internalCache);
+ when(partitionedRegion.getCancelCriterion()).thenReturn(cancelCriterion);
+ when(regionAdvisor.getBucket(anyInt())).thenReturn(bucket);
+ when(bucket.getBucketAdvisor()).thenReturn(bucketAdvisor);
+ when(internalCache.isCacheAtShutdownAll())
+ .thenReturn(Boolean.FALSE)
+ .thenThrow(diskAccessException);
+ when(cancelCriterion.isCancelInProgress()).thenReturn(Boolean.TRUE);
+ doThrow(cacheClosedException).when(cancelCriterion).checkCancelInProgress(diskAccessException);
+ when(partitionedRegion.getRegionAdvisor().adviseFixedPartitionDataStores(partitionName))
+ .thenReturn(memberSet);
+
+ assertThatThrownBy(
+ () -> prHaRedundancyProvider.createBucketAtomically(1, 5000, false, partitionName))
+ .isEqualTo(cacheClosedException);
+ }
+
+ @Test
+ public void createBucketAtomicallyPropagatesDiskAccessExceptionWhenCacheCloseNotInProgress() {
+ String partitionName = "partitionName";
+ DiskAccessException diskAccessException = new DiskAccessException("boom");
+ InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
+ Set<InternalDistributedMember> memberSet = Collections.singleton(internalDistributedMember);
+ InternalCache internalCache = mock(InternalCache.class);
+ RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+ Bucket bucket = mock(Bucket.class);
+ BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+ CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+
+ prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager);
+
+ when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor);
+ when(partitionedRegion.getCache()).thenReturn(internalCache);
+ when(partitionedRegion.getCancelCriterion()).thenReturn(cancelCriterion);
+ when(regionAdvisor.getBucket(anyInt())).thenReturn(bucket);
+ when(bucket.getBucketAdvisor()).thenReturn(bucketAdvisor);
+ when(internalCache.isCacheAtShutdownAll())
+ .thenReturn(Boolean.FALSE)
+ .thenThrow(diskAccessException);
+ when(cancelCriterion.isCancelInProgress()).thenReturn(Boolean.FALSE);
+ when(partitionedRegion.getRegionAdvisor().adviseFixedPartitionDataStores(partitionName))
+ .thenReturn(memberSet);
+
+ assertThatThrownBy(
+ () -> prHaRedundancyProvider.createBucketAtomically(1, 5000, false, partitionName))
+ .isEqualTo(diskAccessException);
+ }
+
+ @Test
@Parameters({"RUNTIME", "CANCEL", "REGION_DESTROYED"})
@TestCaseName("{method}[{index}]: {params}")
public void startTaskCompletesExceptionallyIfExceptionIsThrown(