You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2018/11/15 20:49:00 UTC

[geode] branch develop updated: GEODE-4712 GEODE-5943: shut down the bucketSorter when destroying the partitioned region (#2845)

This is an automated email from the ASF dual-hosted git repository.

jinmeiliao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3f4474c  GEODE-4712 GEODE-5943: shut down the bucketSorter when destroying the partitioned region (#2845)
3f4474c is described below

commit 3f4474cfe2a6b7bb6b20ad31e8be1e2ebff55f45
Author: jinmeiliao <ji...@pivotal.io>
AuthorDate: Thu Nov 15 12:48:51 2018 -0800

    GEODE-4712 GEODE-5943: shut down the bucketSorter when destroying the partitioned region (#2845)
---
 .../internal/cache/eviction/EvictionDUnitTest.java |  8 ++--
 .../cache/PartitionedRegionIntegrationTest.java    | 43 ++++++++++++++++++++++
 .../geode/internal/cache/PartitionedRegion.java    | 15 ++++++--
 .../apache/geode/test/junit/rules/VMProvider.java  |  9 +++++
 4 files changed, 67 insertions(+), 8 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/eviction/EvictionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/eviction/EvictionDUnitTest.java
index ec4e022..eb929a1 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/eviction/EvictionDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/eviction/EvictionDUnitTest.java
@@ -89,7 +89,7 @@ public class EvictionDUnitTest {
     server1 = cluster.startServerVM(1, s -> s.withNoCacheServer()
         .withProperties(properties).withConnectionToLocator(locatorPort));
 
-    VMProvider.invokeInEveryMember(() -> {
+    VMProvider.invokeInEveryMember("setup VM", () -> {
       HeapMemoryMonitor.setTestDisableMemoryUpdates(true);
       System.setProperty("gemfire.memoryEventTolerance", "0");
       InternalCache cache = ClusterStartupRule.getCache();
@@ -104,7 +104,7 @@ public class EvictionDUnitTest {
 
   @Test
   public void testDummyInlineNCentralizedEviction() {
-    VMProvider.invokeInEveryMember(() -> {
+    VMProvider.invokeInEveryMember("create region", () -> {
       ServerStarterRule server = (ServerStarterRule) ClusterStartupRule.memberStarter;
       server.createPartitionRegion("PR1",
           f -> f.setOffHeap(offHeap).setEvictionAttributes(
@@ -113,7 +113,7 @@ public class EvictionDUnitTest {
 
     }, server0, server1);
 
-    server0.invoke(() -> {
+    server0.invoke("put data", () -> {
       Region region = ClusterStartupRule.getCache().getRegion("PR1");
       for (int counter = 1; counter <= 50; counter++) {
         region.put(counter, new byte[ENTRY_SIZE]);
@@ -124,7 +124,7 @@ public class EvictionDUnitTest {
     int server1ExpectedEviction = server1.invoke(() -> sendEventAndWaitForExpectedEviction("PR1"));
 
     // do 4 puts again in PR1
-    server0.invoke(() -> {
+    server0.invoke("put more data", () -> {
       Region region = ClusterStartupRule.getCache().getRegion("PR1");
       for (int counter = 1; counter <= 4; counter++) {
         region.put(counter, new byte[ENTRY_SIZE]);
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java
new file mode 100644
index 0000000..4b20abc
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionIntegrationTest.java
@@ -0,0 +1,43 @@
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.test.junit.rules.ServerStarterRule;
+
+public class PartitionedRegionIntegrationTest {
+
+  @Rule
+  public ServerStarterRule server = new ServerStarterRule().withNoCacheServer().withAutoStart();
+
+  @Test
+  public void bucketSorterShutdownAfterRegionDestroy() {
+    PartitionedRegion region =
+        (PartitionedRegion) server.createRegion(RegionShortcut.PARTITION, "PR1",
+            f -> f.setEvictionAttributes(
+                EvictionAttributes.createLRUHeapAttributes(null, EvictionAction.LOCAL_DESTROY)));
+
+    ScheduledExecutorService bucketSorter = region.getBucketSorter();
+    assertThat(bucketSorter).isNotNull();
+
+    region.destroyRegion();
+
+    assertThat(bucketSorter.isShutdown()).isTrue();
+  }
+
+  @Test
+  public void bucketSorterIsNotCreatedIfNoEviction() {
+    PartitionedRegion region =
+        (PartitionedRegion) server.createRegion(RegionShortcut.PARTITION, "PR1",
+            rf -> rf.setOffHeap(false));
+    ScheduledExecutorService bucketSorter = region.getBucketSorter();
+    assertThat(bucketSorter).isNull();
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index f3e86d2..72c3659 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -467,6 +467,9 @@ public class PartitionedRegion extends LocalRegion
     colocationListeners.remove(colocationListener);
   }
 
+  ScheduledExecutorService getBucketSorter() {
+    return bucketSorter;
+  }
 
   static PRIdMap getPrIdToPR() {
     return prIdToPR;
@@ -7628,6 +7631,10 @@ public class PartitionedRegion extends LocalRegion
       colocatedWithRegion.getColocatedByList().remove(this);
     }
 
+    if (bucketSorter != null) {
+      bucketSorter.shutdown();
+    }
+
     RegionLogger.logDestroy(getName(),
         this.cache.getInternalDistributedSystem().getDistributedMember(), null, op.isClose());
   }
@@ -9243,11 +9250,11 @@ public class PartitionedRegion extends LocalRegion
   public List<BucketRegion> getSortedBuckets() {
     if (!bucketSorterStarted.get()) {
       bucketSorterStarted.set(true);
-      this.bucketSorter.scheduleAtFixedRate(new BucketSorterThread(), 0,
+      this.bucketSorter.scheduleAtFixedRate(new BucketSorterRunnable(), 0,
           HeapEvictor.BUCKET_SORTING_INTERVAL, TimeUnit.MILLISECONDS);
       if (logger.isDebugEnabled()) {
         logger.debug(
-            "Started BucketSorter to sort the buckets according to numver of entries in each bucket for every {} milliseconds",
+            "Started BucketSorter to sort the buckets according to number of entries in each bucket for every {} milliseconds",
             HeapEvictor.BUCKET_SORTING_INTERVAL);
       }
     }
@@ -9259,7 +9266,7 @@ public class PartitionedRegion extends LocalRegion
     return bucketList;
   }
 
-  class BucketSorterThread implements Runnable {
+  class BucketSorterRunnable implements Runnable {
     @Override
     public void run() {
       try {
@@ -9290,7 +9297,7 @@ public class PartitionedRegion extends LocalRegion
         }
       } catch (Exception e) {
         if (logger.isDebugEnabled()) {
-          logger.debug("BucketSorterThread : encountered Exception ", e);
+          logger.debug("BucketSorterRunnable : encountered Exception ", e);
         }
       }
     }
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/VMProvider.java b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/VMProvider.java
index b68699d..de52d9e 100644
--- a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/VMProvider.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/VMProvider.java
@@ -37,6 +37,15 @@ public abstract class VMProvider {
     Arrays.stream(members).forEach(member -> member.invoke(runnableIF));
   }
 
+  public static void invokeInEveryMember(String name, SerializableRunnableIF runnableIF,
+      VMProvider... members) {
+    if (ArrayUtils.isEmpty(members)) {
+      throw new IllegalArgumentException("Array of members must not be null nor empty.");
+    }
+
+    Arrays.stream(members).forEach(member -> member.invoke(name, runnableIF));
+  }
+
   public abstract VM getVM();
 
   public void stop() {