You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2017/03/07 19:21:51 UTC

[28/51] [abbrv] geode git commit: GEODE-2547: Interest registration no longer causes a CacheLoader to be invoked

GEODE-2547: Interest registration no longer causes a CacheLoader to be invoked


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/1a36d36e
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/1a36d36e
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/1a36d36e

Branch: refs/heads/feature/GEM-1195
Commit: 1a36d36ec90d91094689cc3cb30c21be9b25276b
Parents: fb1fdf9
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Tue Feb 28 13:43:50 2017 -0800
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Tue Feb 28 15:55:38 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/geode/cache/Operation.java  | 16 ++++
 .../geode/internal/cache/DistributedRegion.java | 30 ++++--
 .../org/apache/geode/internal/cache/OpType.java |  2 +
 .../cache/tier/sockets/BaseCommand.java         | 18 ++--
 .../tier/sockets/InterestListDUnitTest.java     | 99 ++++++++++++++++++++
 5 files changed, 148 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/1a36d36e/geode-core/src/main/java/org/apache/geode/cache/Operation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/Operation.java b/geode-core/src/main/java/org/apache/geode/cache/Operation.java
index 9b2227b..d835b6c 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/Operation.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/Operation.java
@@ -49,6 +49,7 @@ public final class Operation implements java.io.Serializable {
   private static final byte OP_TYPE_CLEAR = OpType.CLEAR;
   private static final byte OP_TYPE_MARKER = OpType.MARKER;
   private static final byte OP_TYPE_UPDATE_VERSION = OpType.UPDATE_ENTRY_VERSION;
+  private static final byte OP_TYPE_GET_FOR_REGISTER_INTEREST = OpType.GET_FOR_REGISTER_INTEREST;
 
   private static final int OP_DETAILS_NONE = 0;
   private static final int OP_DETAILS_SEARCH = 1;
@@ -531,6 +532,14 @@ public final class Operation implements java.io.Serializable {
       false, // isRegion
       OP_TYPE_DESTROY, OP_DETAILS_REMOVEALL);
 
+  /**
+   * A 'get for register interest' operation.
+   */
+  public static final Operation GET_FOR_REGISTER_INTEREST =
+      new Operation("GET_FOR_REGISTER_INTEREST", false, // isLocal
+          false, // isRegion
+          OP_TYPE_GET_FOR_REGISTER_INTEREST, OP_DETAILS_NONE);
+
   /** The name of this mirror type. */
   private final transient String name;
 
@@ -636,6 +645,13 @@ public final class Operation implements java.io.Serializable {
   }
 
   /**
+   * Returns true if this operation is a get for register interest.
+   */
+  public boolean isGetForRegisterInterest() {
+    return this.opType == OP_TYPE_GET_FOR_REGISTER_INTEREST;
+  }
+
+  /**
    * Returns true if the operation invalidated an entry.
    */
   public boolean isInvalidate() {

http://git-wip-us.apache.org/repos/asf/geode/blob/1a36d36e/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index cc6ccf7..b9cdfd7 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2302,17 +2302,27 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
         if (requestingClient != null) {
           event.setContext(requestingClient);
         }
-        SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
-        try {
-          processor.initialize(this, key, aCallbackArgument);
-          // processor fills in event
-          processor.doSearchAndLoad(event, txState, localValue);
-          if (clientEvent != null && clientEvent.getVersionTag() == null) {
-            clientEvent.setVersionTag(event.getVersionTag());
+        // If this event is because of a register interest call, don't invoke the CacheLoader
+        boolean getForRegisterInterest = clientEvent != null && clientEvent.getOperation() != null
+            && clientEvent.getOperation().isGetForRegisterInterest();
+        if (!getForRegisterInterest) {
+          SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
+          try {
+            processor.initialize(this, key, aCallbackArgument);
+            // processor fills in event
+            processor.doSearchAndLoad(event, txState, localValue);
+            if (clientEvent != null && clientEvent.getVersionTag() == null) {
+              clientEvent.setVersionTag(event.getVersionTag());
+            }
+            lastModified = processor.getLastModified();
+          } finally {
+            processor.release();
+          }
+        } else {
+          if (logger.isDebugEnabled()) {
+            logger.debug("DistributedRegion.findObjectInSystem skipping loader for region="
+                + getFullPath() + "; key=" + key);
           }
-          lastModified = processor.getLastModified();
-        } finally {
-          processor.release();
         }
       }
       if (event.hasNewValue() && !isMemoryThresholdReachedForLoad()) {

http://git-wip-us.apache.org/repos/asf/geode/blob/1a36d36e/geode-core/src/main/java/org/apache/geode/internal/cache/OpType.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/OpType.java b/geode-core/src/main/java/org/apache/geode/internal/cache/OpType.java
index 7685988..ff36a57 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/OpType.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/OpType.java
@@ -49,6 +49,8 @@ public final class OpType {
 
   public static final byte UPDATE_ENTRY_VERSION = 11;
 
+  public static final byte GET_FOR_REGISTER_INTEREST = 12;
+
   public static final byte CLEAR = 16;
 
   public static final byte MARKER = 32;

http://git-wip-us.apache.org/repos/asf/geode/blob/1a36d36e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
index 5379605..d217672 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
@@ -1074,7 +1074,7 @@ public abstract class BaseCommand implements Command {
 
     if (region != null) {
       if (region.containsKey(entryKey) || region.containsTombstone(entryKey)) {
-        VersionTagHolder versionHolder = new VersionTagHolder();
+        VersionTagHolder versionHolder = createVersionTagHolder();
         ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
         // From Get70.getValueAndIsObject()
         Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true);
@@ -1161,7 +1161,7 @@ public abstract class BaseCommand implements Command {
       }
 
       for (Object key : region.keySet(true)) {
-        VersionTagHolder versionHolder = new VersionTagHolder();
+        VersionTagHolder versionHolder = createVersionTagHolder();
         if (keyPattern != null) {
           if (!(key instanceof String)) {
             // key is not a String, cannot apply regex to this entry
@@ -1263,12 +1263,10 @@ public abstract class BaseCommand implements Command {
   public static void appendNewRegisterInterestResponseChunkFromLocal(LocalRegion region,
       VersionedObjectList values, Object riKeys, Set keySet, ServerConnection servConn)
       throws IOException {
-    Object key = null;
-    VersionTagHolder versionHolder = null;
     ClientProxyMembershipID requestingClient = servConn == null ? null : servConn.getProxyID();
     for (Iterator it = keySet.iterator(); it.hasNext();) {
-      key = it.next();
-      versionHolder = new VersionTagHolder();
+      Object key = it.next();
+      VersionTagHolder versionHolder = createVersionTagHolder();
 
       Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true);
 
@@ -1454,7 +1452,7 @@ public abstract class BaseCommand implements Command {
       for (Iterator it = keyList.iterator(); it.hasNext();) {
         Object key = it.next();
         if (region.containsKey(key) || region.containsTombstone(key)) {
-          VersionTagHolder versionHolder = new VersionTagHolder();
+          VersionTagHolder versionHolder = createVersionTagHolder();
 
           ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
           data = region.get(key, null, true, true, true, id, versionHolder, true);
@@ -1475,6 +1473,12 @@ public abstract class BaseCommand implements Command {
     sendNewRegisterInterestResponseChunk(region, keyList, values, true, servConn);
   }
 
+  private static VersionTagHolder createVersionTagHolder() {
+    VersionTagHolder versionHolder = new VersionTagHolder();
+    versionHolder.setOperation(Operation.GET_FOR_REGISTER_INTEREST);
+    return versionHolder;
+  }
+
   /**
    * Append an interest response
    *

http://git-wip-us.apache.org/repos/asf/geode/blob/1a36d36e/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListDUnitTest.java
index e5fb5fc..d8164f1 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListDUnitTest.java
@@ -479,6 +479,80 @@ public class InterestListDUnitTest extends JUnit4DistributedTestCase {
     vm1.invoke(() -> InterestListDUnitTest.confirmNoCacheListenerInvalidates());
   }
 
+  @Test
+  public void testRegisterInterestSingleKeyWithDestroyOnReplicatedRegionWithCacheLoader() {
+    List keysToDestroy = new ArrayList();
+    keysToDestroy.add("0");
+    runRegisterInterestWithDestroyAndCacheLoaderTest(true, keysToDestroy, keysToDestroy);
+  }
+
+  @Test
+  public void testRegisterInterestSingleKeyWithDestroyOnPartitionedRegionWithCacheLoader() {
+    List keysToDestroy = new ArrayList();
+    keysToDestroy.add("0");
+    runRegisterInterestWithDestroyAndCacheLoaderTest(false, keysToDestroy, keysToDestroy);
+  }
+
+  @Test
+  public void testRegisterInterestListOfKeysWithDestroyOnReplicatedRegionWithCacheLoader() {
+    List keysToDestroy = new ArrayList();
+    for (int i = 0; i < 5; i++) {
+      keysToDestroy.add(String.valueOf(i));
+    }
+    runRegisterInterestWithDestroyAndCacheLoaderTest(true, keysToDestroy, keysToDestroy);
+  }
+
+  @Test
+  public void testRegisterInterestListOfKeysWithDestroyOnPartitionedRegionWithCacheLoader() {
+    List keysToDestroy = new ArrayList();
+    for (int i = 0; i < 5; i++) {
+      keysToDestroy.add(String.valueOf(i));
+    }
+    runRegisterInterestWithDestroyAndCacheLoaderTest(false, keysToDestroy, keysToDestroy);
+  }
+
+  @Test
+  public void testRegisterInterestAllKeysWithDestroyOnReplicatedRegionWithCacheLoader() {
+    List keysToDestroy = new ArrayList();
+    keysToDestroy.add("0");
+    runRegisterInterestWithDestroyAndCacheLoaderTest(true, keysToDestroy, "ALL_KEYS");
+  }
+
+  @Test
+  public void testRegisterInterestAllKeysWithDestroyOnPartitionedRegionWithCacheLoader() {
+    List keysToDestroy = new ArrayList();
+    keysToDestroy.add("0");
+    runRegisterInterestWithDestroyAndCacheLoaderTest(false, keysToDestroy, "ALL_KEYS");
+  }
+
+  private void runRegisterInterestWithDestroyAndCacheLoaderTest(boolean addReplicatedRegion,
+      List keysToDestroy, Object keyToRegister) {
+    // The server was already started with a replicated region. Bounce it if necessary
+    int port1 = PORT1;
+    if (!addReplicatedRegion) {
+      vm0.invoke(() -> closeCache());
+      port1 =
+          ((Integer) vm0.invoke(() -> InterestListDUnitTest.createServerCache(addReplicatedRegion)))
+              .intValue();
+    }
+    final int port = port1;
+
+    // Add a cache loader to the region
+    vm0.invoke(() -> addCacheLoader());
+
+    // Create client cache
+    vm1.invoke(() -> createClientCache(NetworkUtils.getServerHostName(vm0.getHost()), port));
+
+    // Destroy appropriate key(s)
+    vm1.invoke(() -> destroyKeys(keysToDestroy));
+
+    // Register interest in appropriate keys(s)
+    vm1.invoke(() -> registerKey(keyToRegister));
+
+    // Verify CacheLoader was not invoked
+    vm0.invoke(() -> verifyNoCacheLoaderLoads());
+  }
+
   private void createCache(Properties props) throws Exception {
     DistributedSystem ds = getSystem(props);
     cache = CacheFactory.create(ds);
@@ -905,6 +979,20 @@ public class InterestListDUnitTest extends JUnit4DistributedTestCase {
     }
   }
 
+  private static void destroyKeys(List keys) {
+    Region r = cache.getRegion(REGION_NAME);
+    for (Object key : keys) {
+      r.destroy(key);
+    }
+  }
+
+  private static void verifyNoCacheLoaderLoads() throws Exception {
+    Region region = cache.getRegion(REGION_NAME);
+    ReturnKeyCacheLoader cacheLoader =
+        (ReturnKeyCacheLoader) region.getAttributes().getCacheLoader();
+    assertEquals(0/* expected */, cacheLoader.getLoads()/* actual */);
+  }
+
   private static void validateEntriesK1andK2(final String vm) {
     WaitCriterion ev = new WaitCriterion() {
       @Override
@@ -1076,6 +1164,8 @@ public class InterestListDUnitTest extends JUnit4DistributedTestCase {
 
   private static class ReturnKeyCacheLoader implements CacheLoader {
 
+    private AtomicInteger loads = new AtomicInteger();
+
     @Override
     public void close() {
       // Do nothing
@@ -1083,7 +1173,16 @@ public class InterestListDUnitTest extends JUnit4DistributedTestCase {
 
     @Override
     public Object load(LoaderHelper helper) throws CacheLoaderException {
+      incrementLoads();
       return helper.getKey();
     }
+
+    private void incrementLoads() {
+      this.loads.incrementAndGet();
+    }
+
+    private int getLoads() {
+      return this.loads.get();
+    }
   }
 }