You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2020/10/02 19:28:50 UTC

[geode] 04/19: GEODE-7912: cacheWriter should be triggered when PR.clear (#4882)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 9a0b0cbc116b49c2e9272b29b5f143b9bf5b99ae
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Mon Mar 30 19:34:35 2020 -0700

    GEODE-7912: cacheWriter should be triggered when PR.clear (#4882)
    
    
            Co-authored-by: Anil <ag...@pivotal.io>
            Co-authored-by: Xiaojian Zhou <gz...@pivotal.io>
---
 .../cache/PartitionedRegionClearDUnitTest.java     | 228 +++++++++++++++++----
 .../apache/geode/internal/cache/LocalRegion.java   |   4 +-
 .../geode/internal/cache/PartitionedRegion.java    |  56 +++--
 3 files changed, 223 insertions(+), 65 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
index fb2a81b..a5a22b9 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
@@ -20,6 +20,7 @@ import static org.apache.geode.test.dunit.rules.ClusterStartupRule.getClientCach
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
@@ -30,13 +31,15 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import org.apache.geode.cache.CacheWriterException;
 import org.apache.geode.cache.InterestResultPolicy;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.client.ClientRegionShortcut;
-import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.cache.util.CacheWriterAdapter;
 import org.apache.geode.test.dunit.SerializableCallableIF;
 import org.apache.geode.test.dunit.rules.ClientVM;
 import org.apache.geode.test.dunit.rules.ClusterStartupRule;
@@ -68,12 +71,6 @@ public class PartitionedRegionClearDUnitTest implements Serializable {
         c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
     client2 = cluster.startClientVM(6,
         c -> c.withPoolSubscription(true).withLocatorConnection((locatorPort)));
-    dataStore1.invoke(this::initDataStore);
-    dataStore2.invoke(this::initDataStore);
-    dataStore3.invoke(this::initDataStore);
-    accessor.invoke(this::initAccessor);
-    client1.invoke(this::initClientCache);
-    client2.invoke(this::initClientCache);
   }
 
   protected RegionShortcut getRegionShortCut() {
@@ -104,14 +101,18 @@ public class PartitionedRegionClearDUnitTest implements Serializable {
     region.registerInterestForAllKeys(InterestResultPolicy.KEYS);
   }
 
-  private void initDataStore() {
-    getCache().createRegionFactory(getRegionShortCut())
-        .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create())
-        .addCacheListener(new CountingCacheListener())
-        .create(REGION_NAME);
+  private void initDataStore(boolean withWriter) {
+    RegionFactory factory = getCache().createRegionFactory(getRegionShortCut())
+        .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create());
+    if (withWriter) {
+      factory.setCacheWriter(new CountingCacheWriter());
+    }
+    factory.create(REGION_NAME);
+    clearsByRegion = new HashMap<>();
+    destroysByRegion = new HashMap<>();
   }
 
-  private void initAccessor() {
+  private void initAccessor(boolean withWriter) {
     RegionShortcut shortcut = getRegionShortCut();
     if (shortcut.isPersistent()) {
       if (shortcut == RegionShortcut.PARTITION_PERSISTENT) {
@@ -126,12 +127,16 @@ public class PartitionedRegionClearDUnitTest implements Serializable {
         fail("Wrong region type:" + shortcut);
       }
     }
-    getCache().createRegionFactory(shortcut)
+    RegionFactory factory = getCache().createRegionFactory(shortcut)
         .setPartitionAttributes(
             new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create())
-        .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create())
-        .addCacheListener(new CountingCacheListener())
-        .create(REGION_NAME);
+        .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create());
+    if (withWriter) {
+      factory.setCacheWriter(new CountingCacheWriter());
+    }
+    factory.create(REGION_NAME);
+    clearsByRegion = new HashMap<>();
+    destroysByRegion = new HashMap<>();
   }
 
   private void feed(boolean isClient) {
@@ -152,45 +157,148 @@ public class PartitionedRegionClearDUnitTest implements Serializable {
     // client2.invoke(()->verifyRegionSize(true, expectedNum));
   }
 
-  private void verifyCacheListenerTriggerCount(MemberVM serverVM) {
-    SerializableCallableIF<Integer> getListenerTriggerCount = () -> {
-      CountingCacheListener countingCacheListener =
-          (CountingCacheListener) getRegion(false).getAttributes()
-              .getCacheListeners()[0];
-      return countingCacheListener.getClears();
-    };
+  SerializableCallableIF<Integer> getWriterClears = () -> {
+    int clears =
+        clearsByRegion.get(REGION_NAME) == null ? 0 : clearsByRegion.get(REGION_NAME).get();
+    return clears;
+  };
 
-    int count = accessor.invoke(getListenerTriggerCount)
-        + dataStore1.invoke(getListenerTriggerCount)
-        + dataStore2.invoke(getListenerTriggerCount)
-        + dataStore3.invoke(getListenerTriggerCount);
-    assertThat(count).isEqualTo(1);
+  SerializableCallableIF<Integer> getWriterDestroys = () -> {
+    int destroys =
+        destroysByRegion.get(REGION_NAME) == null ? 0 : destroysByRegion.get(REGION_NAME).get();
+    return destroys;
+  };
 
-    if (serverVM != null) {
-      assertThat(serverVM.invoke(getListenerTriggerCount)).isEqualTo(1);
-    }
+  void configureServers(boolean dataStoreWithWriter, boolean accessorWithWriter) {
+    dataStore1.invoke(() -> initDataStore(dataStoreWithWriter));
+    dataStore2.invoke(() -> initDataStore(dataStoreWithWriter));
+    dataStore3.invoke(() -> initDataStore(dataStoreWithWriter));
+    accessor.invoke(() -> initAccessor(accessorWithWriter));
+    // make sure only datastore3 has cacheWriter
+    dataStore1.invoke(() -> {
+      Region region = getRegion(false);
+      region.getAttributesMutator().setCacheWriter(null);
+    });
+    dataStore2.invoke(() -> {
+      Region region = getRegion(false);
+      region.getAttributesMutator().setCacheWriter(null);
+    });
+  }
+
+  @Test
+  public void normalClearFromDataStoreWithWriterOnDataStore() {
+    configureServers(true, true);
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+    dataStore3.invoke(() -> getRegion(false).clear());
+    verifyServerRegionSize(0);
+
+    // do the region destroy to compare that the same callbacks will be triggered
+    dataStore3.invoke(() -> {
+      Region region = getRegion(false);
+      region.destroyRegion();
+    });
+
+    assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears))
+        .isEqualTo(0);
+    assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears))
+        .isEqualTo(0);
+    assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears))
+        .isEqualTo(1);
+    assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears))
+        .isEqualTo(0);
   }
 
   @Test
-  public void normalClearFromDataStore() {
+  public void normalClearFromDataStoreWithoutWriterOnDataStore() {
+    configureServers(false, true);
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
     accessor.invoke(() -> feed(false));
     verifyServerRegionSize(NUM_ENTRIES);
     dataStore1.invoke(() -> getRegion(false).clear());
     verifyServerRegionSize(0);
-    verifyCacheListenerTriggerCount(dataStore1);
+
+    // do the region destroy to compare that the same callbacks will be triggered
+    dataStore1.invoke(() -> {
+      Region region = getRegion(false);
+      region.destroyRegion();
+    });
+
+    assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears))
+        .isEqualTo(0);
+    assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears))
+        .isEqualTo(0);
+    assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears))
+        .isEqualTo(0);
+    assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears))
+        .isEqualTo(1);
   }
 
   @Test
-  public void normalClearFromAccessor() {
+  public void normalClearFromAccessorWithWriterOnDataStore() {
+    configureServers(true, true);
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
     accessor.invoke(() -> feed(false));
     verifyServerRegionSize(NUM_ENTRIES);
     accessor.invoke(() -> getRegion(false).clear());
     verifyServerRegionSize(0);
-    verifyCacheListenerTriggerCount(accessor);
+
+    // do the region destroy to compare that the same callbacks will be triggered
+    accessor.invoke(() -> {
+      Region region = getRegion(false);
+      region.destroyRegion();
+    });
+
+    assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears))
+        .isEqualTo(0);
+    assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears))
+        .isEqualTo(0);
+    assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears))
+        .isEqualTo(0);
+    assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears))
+        .isEqualTo(1);
+  }
+
+  @Test
+  public void normalClearFromAccessorWithoutWriterButWithWriterOnDataStore() {
+    configureServers(true, false);
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
+    accessor.invoke(() -> feed(false));
+    verifyServerRegionSize(NUM_ENTRIES);
+    accessor.invoke(() -> getRegion(false).clear());
+    verifyServerRegionSize(0);
+
+    // do the region destroy to compare that the same callbacks will be triggered
+    accessor.invoke(() -> {
+      Region region = getRegion(false);
+      region.destroyRegion();
+    });
+
+    assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears))
+        .isEqualTo(0);
+    assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears))
+        .isEqualTo(0);
+    assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears))
+        .isEqualTo(1);
+    assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears))
+        .isEqualTo(0);
   }
 
   @Test
   public void normalClearFromClient() {
+    configureServers(true, false);
+    client1.invoke(this::initClientCache);
+    client2.invoke(this::initClientCache);
+
     client1.invoke(() -> feed(true));
     verifyClientRegionSize(NUM_ENTRIES);
     verifyServerRegionSize(NUM_ENTRIES);
@@ -198,21 +306,53 @@ public class PartitionedRegionClearDUnitTest implements Serializable {
     client1.invoke(() -> getRegion(true).clear());
     verifyServerRegionSize(0);
     verifyClientRegionSize(0);
-    verifyCacheListenerTriggerCount(null);
+
+    // do the region destroy to compare that the same callbacks will be triggered
+    client1.invoke(() -> {
+      Region region = getRegion(true);
+      region.destroyRegion();
+    });
+
+    assertThat(dataStore1.invoke(getWriterDestroys)).isEqualTo(dataStore1.invoke(getWriterClears))
+        .isEqualTo(0);
+    assertThat(dataStore2.invoke(getWriterDestroys)).isEqualTo(dataStore2.invoke(getWriterClears))
+        .isEqualTo(0);
+    assertThat(dataStore3.invoke(getWriterDestroys)).isEqualTo(dataStore3.invoke(getWriterClears))
+        .isEqualTo(1);
+    assertThat(accessor.invoke(getWriterDestroys)).isEqualTo(accessor.invoke(getWriterClears))
+        .isEqualTo(0);
   }
 
-  private static class CountingCacheListener extends CacheListenerAdapter {
-    private final AtomicInteger clears = new AtomicInteger();
+  public static HashMap<String, AtomicInteger> clearsByRegion = new HashMap<>();
+  public static HashMap<String, AtomicInteger> destroysByRegion = new HashMap<>();
 
+  private static class CountingCacheWriter extends CacheWriterAdapter {
     @Override
-    public void afterRegionClear(RegionEvent event) {
+    public void beforeRegionClear(RegionEvent event) throws CacheWriterException {
       Region region = event.getRegion();
-      logger.info("Region " + region.getFullPath() + " is cleared.");
-      clears.incrementAndGet();
+      AtomicInteger clears = clearsByRegion.get(region.getName());
+      if (clears == null) {
+        clears = new AtomicInteger(1);
+        clearsByRegion.put(region.getName(), clears);
+      } else {
+        clears.incrementAndGet();
+      }
+      logger
+          .info("Region " + region.getName() + " will be cleared, clear count is:" + clears.get());
     }
 
-    int getClears() {
-      return clears.get();
+    @Override
+    public void beforeRegionDestroy(RegionEvent event) throws CacheWriterException {
+      Region region = event.getRegion();
+      AtomicInteger destroys = destroysByRegion.get(region.getName());
+      if (destroys == null) {
+        destroys = new AtomicInteger(1);
+        destroysByRegion.put(region.getName(), destroys);
+      } else {
+        destroys.incrementAndGet();
+      }
+      logger.info(
+          "Region " + region.getName() + " will be destroyed, destroy count is:" + destroys.get());
     }
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 24ab6a2..fdec095 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -3000,7 +3000,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
   /**
    * @since GemFire 5.7
    */
-  private void serverRegionClear(RegionEventImpl regionEvent) {
+  protected void serverRegionClear(RegionEventImpl regionEvent) {
     if (regionEvent.getOperation().isDistributed()) {
       ServerRegionProxy mySRP = getServerProxy();
       if (mySRP != null) {
@@ -3119,7 +3119,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     return result;
   }
 
-  private void cacheWriteBeforeRegionClear(RegionEventImpl event)
+  void cacheWriteBeforeRegionClear(RegionEventImpl event)
       throws CacheWriterException, TimeoutException {
     // copy into local var to prevent race condition
     CacheWriter writer = basicGetWriter();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 01c351b..b829a83 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -2190,6 +2190,9 @@ public class PartitionedRegion extends LocalRegion
           throw cache.getCacheClosedException("Cache is shutting down");
         }
 
+        // do cacheWrite
+        cacheWriteBeforeRegionClear(regionEvent);
+
         // create ClearPRMessage per bucket
         List<ClearPRMessage> clearMsgList = createClearPRMessages(regionEvent.getEventId());
         for (ClearPRMessage clearPRMessage : clearMsgList) {
@@ -4455,6 +4458,26 @@ public class PartitionedRegion extends LocalRegion
     return null;
   }
 
+  boolean triggerWriter(RegionEventImpl event, SearchLoadAndWriteProcessor processor, int paction,
+      String theKey) {
+    CacheWriter localWriter = basicGetWriter();
+    Set netWriteRecipients = localWriter == null ? this.distAdvisor.adviseNetWrite() : null;
+
+    if (localWriter == null && (netWriteRecipients == null || netWriteRecipients.isEmpty())) {
+      return false;
+    }
+
+    final long start = getCachePerfStats().startCacheWriterCall();
+    try {
+      processor.initialize(this, theKey, null);
+      processor.doNetWrite(event, netWriteRecipients, localWriter, paction);
+      processor.release();
+    } finally {
+      getCachePerfStats().endCacheWriterCall(start);
+    }
+    return true;
+  }
+
   /**
    * This invokes a cache writer before a destroy operation. Although it has the same method
    * signature as the method in LocalRegion, it is invoked in a different code path. LocalRegion
@@ -4464,31 +4487,26 @@ public class PartitionedRegion extends LocalRegion
   @Override
   boolean cacheWriteBeforeRegionDestroy(RegionEventImpl event)
       throws CacheWriterException, TimeoutException {
-
     if (event.getOperation().isDistributed()) {
       serverRegionDestroy(event);
-      CacheWriter localWriter = basicGetWriter();
-      Set netWriteRecipients = localWriter == null ? this.distAdvisor.adviseNetWrite() : null;
-
-      if (localWriter == null && (netWriteRecipients == null || netWriteRecipients.isEmpty())) {
-        return false;
-      }
-
-      final long start = getCachePerfStats().startCacheWriterCall();
-      try {
-        SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
-        processor.initialize(this, "preDestroyRegion", null);
-        processor.doNetWrite(event, netWriteRecipients, localWriter,
-            SearchLoadAndWriteProcessor.BEFOREREGIONDESTROY);
-        processor.release();
-      } finally {
-        getCachePerfStats().endCacheWriterCall(start);
-      }
-      return true;
+      SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
+      return triggerWriter(event, processor, SearchLoadAndWriteProcessor.BEFOREREGIONDESTROY,
+          "preDestroyRegion");
     }
     return false;
   }
 
+  @Override
+  void cacheWriteBeforeRegionClear(RegionEventImpl event)
+      throws CacheWriterException, TimeoutException {
+    if (event.getOperation().isDistributed()) {
+      serverRegionClear(event);
+      SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
+      triggerWriter(event, processor, SearchLoadAndWriteProcessor.BEFOREREGIONCLEAR,
+          "preClearRegion");
+    }
+  }
+
   /**
    * Test Method: Get the DistributedMember identifier for the vm containing a key
    *