You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nr...@apache.org on 2018/03/26 22:01:12 UTC

[geode] branch develop updated: GEODE-4933: Include members with NORMAL and PRELOADED data policy in … (#1676)

This is an automated email from the ASF dual-hosted git repository.

nreich pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new b84d103  GEODE-4933: Include members with NORMAL and PRELOADED data policy in … (#1676)
b84d103 is described below

commit b84d10301be4cc41872138531a0cb414d4b04b98
Author: Nick Reich <nr...@pivotal.io>
AuthorDate: Mon Mar 26 15:01:08 2018 -0700

    GEODE-4933: Include members with NORMAL and PRELOADED data policy in … (#1676)
    
    * GEODE-4933: Include members with NORMAL and PRELOADED data policy in expiration
    
      * If the data policy for a replicate region is PRELOADED, for example, if
        eviction is used and local destroy is the eviction action, that member was
        being excluded from the set of members to get last access time before
        expiration. Those members are now included.
    
      * Members that were caching proxies, thus having the NORMAL data policy,
        were also being excluded and are not included in the last access time
        lookup.
---
 .../cache/LatestLastAccessTimeOperation.java       |   2 +-
 .../PREntryIdleExpirationDistributedTest.java      |  97 ++++++++++++++++--
 ...eplicateEntryIdleExpirationDistributedTest.java | 109 +++++++++++++++++++--
 3 files changed, 193 insertions(+), 15 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeOperation.java
index 7c67eb8..a9d8e29 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LatestLastAccessTimeOperation.java
@@ -38,7 +38,7 @@ public class LatestLastAccessTimeOperation<K> {
 
   public long getLatestLastAccessTime() {
     final Set<InternalDistributedMember> recipients =
-        this.region.getCacheDistributionAdvisor().adviseInitializedReplicates();
+        this.region.getCacheDistributionAdvisor().adviseNetSearch();
     final DistributionManager dm = this.region.getDistributionManager();
     dm.retainMembersWithSameOrNewerVersion(recipients, Version.GEODE_140);
     final LatestLastAccessTimeReplyProcessor replyProcessor =
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PREntryIdleExpirationDistributedTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PREntryIdleExpirationDistributedTest.java
index acbc37d..46c538e 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PREntryIdleExpirationDistributedTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PREntryIdleExpirationDistributedTest.java
@@ -14,25 +14,65 @@
  */
 package org.apache.geode.internal.cache;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.geode.cache.ExpirationAction.DESTROY;
 import static org.apache.geode.cache.RegionShortcut.PARTITION;
+import static org.apache.geode.test.dunit.Host.getHost;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
 
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.base.Stopwatch;
+import org.junit.After;
 import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.ExpirationAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
 import org.apache.geode.test.junit.categories.DistributedTest;
 
 @Category(DistributedTest.class)
-@SuppressWarnings("serial")
-public class PREntryIdleExpirationDistributedTest
-    extends ReplicateEntryIdleExpirationDistributedTest {
+public class PREntryIdleExpirationDistributedTest implements Serializable {
+
+  @ClassRule
+  public static DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+  private static final AtomicBoolean KEEP_READING = new AtomicBoolean(true);
+
+  private static final String KEY = "KEY";
+  private static final String VALUE = "VALUE";
+
+  private final VM member1 = getHost(0).getVM(0);
+  private final VM member2 = getHost(0).getVM(1);
+  private final VM member3 = getHost(0).getVM(2);
+  private final String regionName = getClass().getSimpleName();
+
+  @Rule
+  public CacheRule cacheRule = CacheRule.builder().createCacheIn(member1).createCacheIn(member2)
+      .createCacheIn(member3).createCacheIn(getHost(0).getVM(3)).build();
 
   @Before
-  public void setUpPREntryIdleExpirationTest() throws Exception {
+  public void setUp() throws Exception {
+    VM[] vms = new VM[] {member1, member2, member3};
+    for (VM vm : vms) {
+      vm.invoke(() -> {
+        KEEP_READING.set(true);
+        ExpiryTask.suspendExpiration();
+        createRegion();
+      });
+    }
+
     // make member1 the primary bucket for KEY
     member1.invoke(() -> {
       Region<String, String> region = cacheRule.getCache().getRegion(regionName);
@@ -40,8 +80,53 @@ public class PREntryIdleExpirationDistributedTest
     });
   }
 
-  @Override
-  protected void createRegion() {
+  @After
+  public void tearDown() throws Exception {
+    VM[] vms = new VM[] {member1, member2, member3};
+    for (VM vm : vms) {
+      vm.invoke(() -> {
+        KEEP_READING.set(false);
+        ExpiryTask.permitExpiration();
+      });
+    }
+  }
+
+  @Test
+  public void readsInOtherMemberShouldPreventExpiration() throws Exception {
+    AsyncInvocation<?> memberReading = member3.invokeAsync(() -> {
+      Region<String, String> region = cacheRule.getCache().getRegion(regionName);
+      region.put(KEY, VALUE);
+      while (KEEP_READING.get()) {
+        region.get(KEY);
+        Thread.sleep(10);
+      }
+    });
+
+    member2.invoke(() -> {
+      Region<String, String> region = cacheRule.getCache().getRegion(regionName);
+      await().atMost(30, SECONDS).until(() -> region.containsKey(KEY));
+      assertThat(region.containsKey(KEY)).isTrue();
+    });
+
+    member1.invoke(() -> {
+      Region<String, String> region = cacheRule.getCache().getRegion(regionName);
+      await().atMost(30, SECONDS).until(() -> region.containsKey(KEY));
+      assertThat(region.containsKey(KEY)).isTrue();
+
+      ExpiryTask.permitExpiration();
+      Stopwatch stopwatch = Stopwatch.createStarted();
+      while (stopwatch.elapsed(SECONDS) <= 5 && region.containsKey(KEY)) {
+        Thread.sleep(10);
+      }
+      assertThat(region.containsKey(KEY)).isTrue();
+    });
+
+    member3.invoke(() -> KEEP_READING.set(false));
+
+    memberReading.await();
+  }
+
+  private void createRegion() {
     RegionFactory<String, String> factory = cacheRule.getCache().createRegionFactory(PARTITION);
     factory.setPartitionAttributes(
         new PartitionAttributesFactory<String, String>().setRedundantCopies(2).create());
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ReplicateEntryIdleExpirationDistributedTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ReplicateEntryIdleExpirationDistributedTest.java
index e1d32ce..afac1a3 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ReplicateEntryIdleExpirationDistributedTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ReplicateEntryIdleExpirationDistributedTest.java
@@ -32,9 +32,12 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.ExpirationAttributes;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.rules.CacheRule;
@@ -42,7 +45,6 @@ import org.apache.geode.test.dunit.rules.DistributedTestRule;
 import org.apache.geode.test.junit.categories.DistributedTest;
 
 @Category(DistributedTest.class)
-@SuppressWarnings("serial")
 public class ReplicateEntryIdleExpirationDistributedTest implements Serializable {
 
   @ClassRule
@@ -50,17 +52,17 @@ public class ReplicateEntryIdleExpirationDistributedTest implements Serializable
 
   private static final AtomicBoolean KEEP_READING = new AtomicBoolean(true);
 
-  protected static final String KEY = "KEY";
-  protected static final String VALUE = "VALUE";
+  private static final String KEY = "KEY";
+  private static final String VALUE = "VALUE";
 
-  protected final VM member1 = getHost(0).getVM(0);
-  protected final VM member2 = getHost(0).getVM(1);
-  protected final VM member3 = getHost(0).getVM(2);
-  protected final String regionName = getClass().getSimpleName();
+  private final VM member1 = getHost(0).getVM(0);
+  private final VM member2 = getHost(0).getVM(1);
+  private final VM member3 = getHost(0).getVM(2);
+  private final String regionName = getClass().getSimpleName();
 
   @Rule
   public CacheRule cacheRule = CacheRule.builder().createCacheIn(member1).createCacheIn(member2)
-      .createCacheIn(member3).build();
+      .createCacheIn(member3).createCacheIn(getHost(0).getVM(3)).build();
 
   @Before
   public void setUp() throws Exception {
@@ -120,9 +122,100 @@ public class ReplicateEntryIdleExpirationDistributedTest implements Serializable
     memberReading.await();
   }
 
+  @Test
+  public void readsInNormalMemberShouldPreventExpiration() throws Exception {
+    VM member4 = getHost(0).getVM(3);
+    member4.invoke(() -> {
+      KEEP_READING.set(true);
+      ExpiryTask.suspendExpiration();
+
+      RegionFactory<String, String> factory = cacheRule.getCache().createRegionFactory();
+      factory.setDataPolicy(DataPolicy.NORMAL).setScope(Scope.DISTRIBUTED_ACK);
+      factory.setEntryIdleTimeout(new ExpirationAttributes(1, DESTROY));
+      factory.create(regionName);
+    });
+    AsyncInvocation<?> memberReading = member4.invokeAsync(() -> {
+      Region<String, String> region = cacheRule.getCache().getRegion(regionName);
+      region.put(KEY, VALUE);
+      while (KEEP_READING.get()) {
+        region.get(KEY);
+        Thread.sleep(10);
+      }
+    });
+
+    member2.invoke(() -> {
+      Region<String, String> region = cacheRule.getCache().getRegion(regionName);
+      await().atMost(30, SECONDS).until(() -> region.containsKey(KEY));
+      assertThat(region.containsKey(KEY)).isTrue();
+    });
+
+    member1.invoke(() -> {
+      Region<String, String> region = cacheRule.getCache().getRegion(regionName);
+      await().atMost(30, SECONDS).until(() -> region.containsKey(KEY));
+      assertThat(region.containsKey(KEY)).isTrue();
+
+      ExpiryTask.permitExpiration();
+      Stopwatch stopwatch = Stopwatch.createStarted();
+      while (stopwatch.elapsed(SECONDS) <= 5 && region.containsKey(KEY)) {
+        Thread.sleep(10);
+      }
+      assertThat(region.containsKey(KEY)).isTrue();
+    });
+
+    member4.invoke(() -> KEEP_READING.set(false));
+
+    memberReading.await();
+  }
+
+  @Test
+  public void readsInOtherMemberShouldPreventExpirationWhenEvictionEnabled() throws Exception {
+    String evictionRegionName = "evictionRegion";
+    member1.invoke(() -> createEvictionRegion(evictionRegionName));
+    member2.invoke(() -> createEvictionRegion(evictionRegionName));
+    member3.invoke(() -> createEvictionRegion(evictionRegionName));
+    AsyncInvocation<?> memberReading = member3.invokeAsync(() -> {
+      Region<String, String> region = cacheRule.getCache().getRegion(evictionRegionName);
+      region.put(KEY, VALUE);
+      while (KEEP_READING.get()) {
+        region.get(KEY);
+        Thread.sleep(10);
+      }
+    });
+
+    member2.invoke(() -> {
+      Region<String, String> region = cacheRule.getCache().getRegion(evictionRegionName);
+      await().atMost(30, SECONDS).until(() -> region.containsKey(KEY));
+      assertThat(region.containsKey(KEY)).isTrue();
+    });
+
+    member1.invoke(() -> {
+      Region<String, String> region = cacheRule.getCache().getRegion(evictionRegionName);
+      await().atMost(30, SECONDS).until(() -> region.containsKey(KEY));
+      assertThat(region.containsKey(KEY)).isTrue();
+
+      ExpiryTask.permitExpiration();
+      Stopwatch stopwatch = Stopwatch.createStarted();
+      while (stopwatch.elapsed(SECONDS) <= 5 && region.containsKey(KEY)) {
+        Thread.sleep(10);
+      }
+      assertThat(region.containsKey(KEY)).isTrue();
+    });
+
+    member3.invoke(() -> KEEP_READING.set(false));
+
+    memberReading.await();
+  }
+
   protected void createRegion() {
     RegionFactory<String, String> factory = cacheRule.getCache().createRegionFactory(REPLICATE);
     factory.setEntryIdleTimeout(new ExpirationAttributes(1, DESTROY));
     factory.create(regionName);
   }
+
+  private void createEvictionRegion(String regionName) {
+    RegionFactory<String, String> factory = cacheRule.getCache().createRegionFactory(REPLICATE);
+    factory.setEntryIdleTimeout(new ExpirationAttributes(1, DESTROY));
+    factory.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(100));
+    factory.create(regionName);
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
nreich@apache.org.