You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jm...@apache.org on 2022/02/17 12:58:58 UTC

[geode] branch support/1.15 updated: GEODE-9990: turn DiskAccessException into CacheClosedException (#7334) (#7374)

This is an automated email from the ASF dual-hosted git repository.

jmelchior pushed a commit to branch support/1.15
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.15 by this push:
     new dee8335  GEODE-9990: turn DiskAccessException into CacheClosedException (#7334) (#7374)
dee8335 is described below

commit dee83353ce4317062535250c3ae9cb830358e202
Author: Joris Melchior <jo...@gmail.com>
AuthorDate: Thu Feb 17 07:56:17 2022 -0500

    GEODE-9990: turn DiskAccessException into CacheClosedException (#7334) (#7374)
    
    * GEODE-9990: turn DiskAccessException into CacheClosedException
    
    - when DiskInitFile is in closed state and DiskStoreImpl is closed or
      closing
    - catch DiskAccessException in PRHARedundancyProvider and turn into
      CacheClosedException if cache closing is in progress
    - change CreateBucketMessage to handle DiskAccessException as cause of
      ReplyException
    
    (cherry picked from commit a98197b5d0a3a2547e0581e475dcabaa82e6e92f)
---
 .../internal/cache/DiskInitFileJUnitTest.java      | 72 ++++++++++++++++++++++
 .../apache/geode/internal/cache/DiskInitFile.java  | 25 ++++++--
 .../internal/cache/PRHARedundancyProvider.java     | 10 +++
 .../cache/partitioned/CreateBucketMessage.java     |  5 +-
 .../internal/cache/PRHARedundancyProviderTest.java | 72 ++++++++++++++++++++++
 5 files changed, 176 insertions(+), 8 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java
index 927616d..56b3e55 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/DiskInitFileJUnitTest.java
@@ -15,10 +15,15 @@
 package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
@@ -31,8 +36,11 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import org.apache.geode.CancelCriterion;
 import org.apache.geode.Statistics;
 import org.apache.geode.StatisticsFactory;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.internal.cache.persistence.DiskRegionView;
 import org.apache.geode.internal.cache.persistence.DiskStoreID;
 
@@ -134,4 +142,68 @@ public class DiskInitFileJUnitTest {
     assertThat(dif.hasKrf(2)).isFalse();
     dif.destroy();
   }
+
+  @Test
+  public void markInitializedThrowsDiskAccessExceptionWhenInitFileClosedAndParentAndCacheNotClosing() {
+    markInitializedTestSetup();
+
+    DiskInitFile diskInitFile =
+        new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet());
+    diskInitFile.close();
+
+    assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isInstanceOf(
+        DiskAccessException.class);
+  }
+
+  @Test
+  public void markInitializedThrowsCacheClosedExceptionWhenInitFileClosedAndParentIsClosedOrClosing() {
+    markInitializedTestSetup();
+    when(mockedDiskStoreImpl.isClosed()).thenReturn(Boolean.TRUE);
+
+    DiskInitFile diskInitFile =
+        new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet());
+    diskInitFile.close();
+
+    assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isInstanceOf(
+        CacheClosedException.class);
+  }
+
+  @Test
+  public void markInitializedThrowsCacheClosedExceptionWhenCacheIsClosing() {
+    CancelCriterion cancelCriterion = markInitializedTestSetup();
+    CacheClosedException cacheClosedException = new CacheClosedException("boom");
+    doThrow(cacheClosedException).when(cancelCriterion).checkCancelInProgress();
+
+    DiskInitFile diskInitFile =
+        new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet());
+    diskInitFile.close();
+
+    assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView)).isEqualTo(
+        cacheClosedException);
+  }
+
+  @Test
+  public void markInitializedCacheCloseIsCalledWhenParentHandlesDiskAccessException() {
+    markInitializedTestSetup();
+
+    DiskInitFile diskInitFile =
+        new DiskInitFile("testThrows", mockedDiskStoreImpl, false, Collections.emptySet());
+    diskInitFile.close();
+
+    assertThatThrownBy(() -> diskInitFile.markInitialized(mockDiskRegionView))
+        .isInstanceOf(DiskAccessException.class);
+    verify(mockedDiskStoreImpl, times(1)).handleDiskAccessException(any(DiskAccessException.class));
+  }
+
+  private CancelCriterion markInitializedTestSetup() {
+    InternalCache internalCache = mock(InternalCache.class);
+    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+    DiskRegion diskRegion = mock(DiskRegion.class);
+
+    when(mockedDiskStoreImpl.getCache()).thenReturn(internalCache);
+    when(mockedDiskStoreImpl.getById(anyLong())).thenReturn(diskRegion);
+    when(internalCache.getCancelCriterion()).thenReturn(cancelCriterion);
+
+    return cancelCriterion;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java
index a9651e5..c6baad2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java
@@ -51,6 +51,7 @@ import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.Instantiator;
+import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.cache.EvictionAction;
 import org.apache.geode.cache.EvictionAlgorithm;
@@ -1307,11 +1308,24 @@ public class DiskInitFile implements DiskInitFileInterpreter {
     writeIFRecord(bb, true);
   }
 
-  private void writeIFRecord(ByteBuffer bb, boolean doStats) throws IOException {
-    assert lock.isHeldByCurrentThread();
+  private void checkClosed() {
     if (closed) {
-      throw new DiskAccessException("The disk store is closed", parent);
+      parent.getCache().getCancelCriterion().checkCancelInProgress();
+
+      if (parent.isClosed() || parent.isClosing()) {
+        throw new CacheClosedException("The disk store is closed or closing");
+      }
+
+      DiskAccessException dae = new DiskAccessException("The disk init file is closed", parent);
+      parent.handleDiskAccessException(dae);
+
+      throw dae;
     }
+  }
+
+  private void writeIFRecord(ByteBuffer bb, boolean doStats) throws IOException {
+    assert lock.isHeldByCurrentThread();
+    checkClosed();
 
     ifRAF.write(bb.array(), 0, bb.position());
     if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES_VERBOSE)) {
@@ -1327,9 +1341,8 @@ public class DiskInitFile implements DiskInitFileInterpreter {
 
   private void writeIFRecord(HeapDataOutputStream hdos, boolean doStats) throws IOException {
     assert lock.isHeldByCurrentThread();
-    if (closed) {
-      throw new DiskAccessException("The disk store is closed", parent);
-    }
+    checkClosed();
+
     hdos.sendTo(ifRAF);
     if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES_VERBOSE)) {
       logger.trace(LogMarker.PERSIST_WRITES_VERBOSE, "DiskInitFile writeIFRecord HDOS");
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
index e07ab55..0e24e0f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PRHARedundancyProvider.java
@@ -43,11 +43,13 @@ import java.util.function.BiFunction;
 
 import org.apache.logging.log4j.Logger;
 
+import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.annotations.Immutable;
 import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.annotations.internal.MakeNotStatic;
+import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.cache.PartitionedRegionStorageException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
@@ -726,6 +728,14 @@ public class PRHARedundancyProvider {
             return bucketPrimary;
           }
         }
+      } catch (DiskAccessException dae) {
+        CancelCriterion cancelCriterion = partitionedRegion.getCancelCriterion();
+        if (cancelCriterion.isCancelInProgress()) {
+          needToElectPrimary = false;
+          cancelCriterion.checkCancelInProgress(dae);
+        }
+
+        throw dae;
       } catch (CancelException | RegionDestroyedException e) {
         // We don't need to elect a primary if the cache was closed. The other members will
         // take care of it. This ensures we don't compromise redundancy.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java
index e71715d..ae1caaf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/CreateBucketMessage.java
@@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.cache.PartitionedRegionStorageException;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DistributionManager;
@@ -339,9 +340,9 @@ public class CreateBucketMessage extends PartitionMessage {
         waitForRepliesUninterruptibly();
       } catch (ReplyException e) {
         Throwable t = e.getCause();
-        if (t instanceof CancelException) {
+        if (t instanceof DiskAccessException || t instanceof CancelException) {
           logger.debug(
-              "NodeResponse got remote cancellation, throwing PartitionedRegionCommunication Exception {}",
+              "NodeResponse got remote exception, throwing PartitionedRegionCommunication Exception {}",
               t.getMessage(), t);
           return null;
         }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
index 6cd69b7..7cc089e 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PRHARedundancyProviderTest.java
@@ -16,16 +16,21 @@ package org.apache.geode.internal.cache;
 
 import static org.apache.geode.cache.Region.SEPARATOR;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
@@ -44,10 +49,13 @@ import org.mockito.stubbing.Answer;
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskAccessException;
 import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.control.InternalResourceManager;
+import org.apache.geode.internal.cache.partitioned.Bucket;
 import org.apache.geode.internal.cache.partitioned.InternalPRInfo;
 import org.apache.geode.internal.cache.partitioned.LoadProbe;
 import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOp;
@@ -248,6 +256,70 @@ public class PRHARedundancyProviderTest {
   }
 
   @Test
+  public void createBucketAtomicallyConvertsDiskAccessExceptionWhenCacheCloseInProgress() {
+    String partitionName = "partitionName";
+    DiskAccessException diskAccessException = new DiskAccessException("boom");
+    CacheClosedException cacheClosedException = new CacheClosedException(diskAccessException);
+    InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
+    Set<InternalDistributedMember> memberSet = Collections.singleton(internalDistributedMember);
+    InternalCache internalCache = mock(InternalCache.class);
+    RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+    Bucket bucket = mock(Bucket.class);
+    BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+
+    prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager);
+
+    when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor);
+    when(partitionedRegion.getCache()).thenReturn(internalCache);
+    when(partitionedRegion.getCancelCriterion()).thenReturn(cancelCriterion);
+    when(regionAdvisor.getBucket(anyInt())).thenReturn(bucket);
+    when(bucket.getBucketAdvisor()).thenReturn(bucketAdvisor);
+    when(internalCache.isCacheAtShutdownAll())
+        .thenReturn(Boolean.FALSE)
+        .thenThrow(diskAccessException);
+    when(cancelCriterion.isCancelInProgress()).thenReturn(Boolean.TRUE);
+    doThrow(cacheClosedException).when(cancelCriterion).checkCancelInProgress(diskAccessException);
+    when(partitionedRegion.getRegionAdvisor().adviseFixedPartitionDataStores(partitionName))
+        .thenReturn(memberSet);
+
+    assertThatThrownBy(
+        () -> prHaRedundancyProvider.createBucketAtomically(1, 5000, false, partitionName))
+            .isEqualTo(cacheClosedException);
+  }
+
+  @Test
+  public void createBucketAtomicallyPropagatesDiskAccessExceptionWhenCacheCloseNotInProgress() {
+    String partitionName = "partitionName";
+    DiskAccessException diskAccessException = new DiskAccessException("boom");
+    InternalDistributedMember internalDistributedMember = mock(InternalDistributedMember.class);
+    Set<InternalDistributedMember> memberSet = Collections.singleton(internalDistributedMember);
+    InternalCache internalCache = mock(InternalCache.class);
+    RegionAdvisor regionAdvisor = mock(RegionAdvisor.class);
+    Bucket bucket = mock(Bucket.class);
+    BucketAdvisor bucketAdvisor = mock(BucketAdvisor.class);
+    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+
+    prHaRedundancyProvider = new PRHARedundancyProvider(partitionedRegion, resourceManager);
+
+    when(partitionedRegion.getRegionAdvisor()).thenReturn(regionAdvisor);
+    when(partitionedRegion.getCache()).thenReturn(internalCache);
+    when(partitionedRegion.getCancelCriterion()).thenReturn(cancelCriterion);
+    when(regionAdvisor.getBucket(anyInt())).thenReturn(bucket);
+    when(bucket.getBucketAdvisor()).thenReturn(bucketAdvisor);
+    when(internalCache.isCacheAtShutdownAll())
+        .thenReturn(Boolean.FALSE)
+        .thenThrow(diskAccessException);
+    when(cancelCriterion.isCancelInProgress()).thenReturn(Boolean.FALSE);
+    when(partitionedRegion.getRegionAdvisor().adviseFixedPartitionDataStores(partitionName))
+        .thenReturn(memberSet);
+
+    assertThatThrownBy(
+        () -> prHaRedundancyProvider.createBucketAtomically(1, 5000, false, partitionName))
+            .isEqualTo(diskAccessException);
+  }
+
+  @Test
   @Parameters({"RUNTIME", "CANCEL", "REGION_DESTROYED"})
   @TestCaseName("{method}[{index}]: {params}")
   public void startTaskCompletesExceptionallyIfExceptionIsThrown(