You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2017/03/07 19:21:51 UTC
[28/51] [abbrv] geode git commit: GEODE-2547: Interest registration
no longer causes a CacheLoader to be invoked
GEODE-2547: Interest registration no longer causes a CacheLoader to be invoked
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/1a36d36e
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/1a36d36e
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/1a36d36e
Branch: refs/heads/feature/GEM-1195
Commit: 1a36d36ec90d91094689cc3cb30c21be9b25276b
Parents: fb1fdf9
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Tue Feb 28 13:43:50 2017 -0800
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Tue Feb 28 15:55:38 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/geode/cache/Operation.java | 16 ++++
.../geode/internal/cache/DistributedRegion.java | 30 ++++--
.../org/apache/geode/internal/cache/OpType.java | 2 +
.../cache/tier/sockets/BaseCommand.java | 18 ++--
.../tier/sockets/InterestListDUnitTest.java | 99 ++++++++++++++++++++
5 files changed, 148 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/1a36d36e/geode-core/src/main/java/org/apache/geode/cache/Operation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/Operation.java b/geode-core/src/main/java/org/apache/geode/cache/Operation.java
index 9b2227b..d835b6c 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/Operation.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/Operation.java
@@ -49,6 +49,7 @@ public final class Operation implements java.io.Serializable {
private static final byte OP_TYPE_CLEAR = OpType.CLEAR;
private static final byte OP_TYPE_MARKER = OpType.MARKER;
private static final byte OP_TYPE_UPDATE_VERSION = OpType.UPDATE_ENTRY_VERSION;
+ private static final byte OP_TYPE_GET_FOR_REGISTER_INTEREST = OpType.GET_FOR_REGISTER_INTEREST;
private static final int OP_DETAILS_NONE = 0;
private static final int OP_DETAILS_SEARCH = 1;
@@ -531,6 +532,14 @@ public final class Operation implements java.io.Serializable {
false, // isRegion
OP_TYPE_DESTROY, OP_DETAILS_REMOVEALL);
+ /**
+ * A 'get for register interest' operation.
+ */
+ public static final Operation GET_FOR_REGISTER_INTEREST =
+ new Operation("GET_FOR_REGISTER_INTEREST", false, // isLocal
+ false, // isRegion
+ OP_TYPE_GET_FOR_REGISTER_INTEREST, OP_DETAILS_NONE);
+
/** The name of this mirror type. */
private final transient String name;
@@ -636,6 +645,13 @@ public final class Operation implements java.io.Serializable {
}
/**
+ * Returns true if this operation is a get for register interest.
+ */
+ public boolean isGetForRegisterInterest() {
+ return this.opType == OP_TYPE_GET_FOR_REGISTER_INTEREST;
+ }
+
+ /**
* Returns true if the operation invalidated an entry.
*/
public boolean isInvalidate() {
http://git-wip-us.apache.org/repos/asf/geode/blob/1a36d36e/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index cc6ccf7..b9cdfd7 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2302,17 +2302,27 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
if (requestingClient != null) {
event.setContext(requestingClient);
}
- SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
- try {
- processor.initialize(this, key, aCallbackArgument);
- // processor fills in event
- processor.doSearchAndLoad(event, txState, localValue);
- if (clientEvent != null && clientEvent.getVersionTag() == null) {
- clientEvent.setVersionTag(event.getVersionTag());
+ // If this event is because of a register interest call, don't invoke the CacheLoader
+ boolean getForRegisterInterest = clientEvent != null && clientEvent.getOperation() != null
+ && clientEvent.getOperation().isGetForRegisterInterest();
+ if (!getForRegisterInterest) {
+ SearchLoadAndWriteProcessor processor = SearchLoadAndWriteProcessor.getProcessor();
+ try {
+ processor.initialize(this, key, aCallbackArgument);
+ // processor fills in event
+ processor.doSearchAndLoad(event, txState, localValue);
+ if (clientEvent != null && clientEvent.getVersionTag() == null) {
+ clientEvent.setVersionTag(event.getVersionTag());
+ }
+ lastModified = processor.getLastModified();
+ } finally {
+ processor.release();
+ }
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("DistributedRegion.findObjectInSystem skipping loader for region="
+ + getFullPath() + "; key=" + key);
}
- lastModified = processor.getLastModified();
- } finally {
- processor.release();
}
}
if (event.hasNewValue() && !isMemoryThresholdReachedForLoad()) {
http://git-wip-us.apache.org/repos/asf/geode/blob/1a36d36e/geode-core/src/main/java/org/apache/geode/internal/cache/OpType.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/OpType.java b/geode-core/src/main/java/org/apache/geode/internal/cache/OpType.java
index 7685988..ff36a57 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/OpType.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/OpType.java
@@ -49,6 +49,8 @@ public final class OpType {
public static final byte UPDATE_ENTRY_VERSION = 11;
+ public static final byte GET_FOR_REGISTER_INTEREST = 12;
+
public static final byte CLEAR = 16;
public static final byte MARKER = 32;
http://git-wip-us.apache.org/repos/asf/geode/blob/1a36d36e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
index 5379605..d217672 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
@@ -1074,7 +1074,7 @@ public abstract class BaseCommand implements Command {
if (region != null) {
if (region.containsKey(entryKey) || region.containsTombstone(entryKey)) {
- VersionTagHolder versionHolder = new VersionTagHolder();
+ VersionTagHolder versionHolder = createVersionTagHolder();
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
// From Get70.getValueAndIsObject()
Object data = region.get(entryKey, null, true, true, true, id, versionHolder, true);
@@ -1161,7 +1161,7 @@ public abstract class BaseCommand implements Command {
}
for (Object key : region.keySet(true)) {
- VersionTagHolder versionHolder = new VersionTagHolder();
+ VersionTagHolder versionHolder = createVersionTagHolder();
if (keyPattern != null) {
if (!(key instanceof String)) {
// key is not a String, cannot apply regex to this entry
@@ -1263,12 +1263,10 @@ public abstract class BaseCommand implements Command {
public static void appendNewRegisterInterestResponseChunkFromLocal(LocalRegion region,
VersionedObjectList values, Object riKeys, Set keySet, ServerConnection servConn)
throws IOException {
- Object key = null;
- VersionTagHolder versionHolder = null;
ClientProxyMembershipID requestingClient = servConn == null ? null : servConn.getProxyID();
for (Iterator it = keySet.iterator(); it.hasNext();) {
- key = it.next();
- versionHolder = new VersionTagHolder();
+ Object key = it.next();
+ VersionTagHolder versionHolder = createVersionTagHolder();
Object value = region.get(key, null, true, true, true, requestingClient, versionHolder, true);
@@ -1454,7 +1452,7 @@ public abstract class BaseCommand implements Command {
for (Iterator it = keyList.iterator(); it.hasNext();) {
Object key = it.next();
if (region.containsKey(key) || region.containsTombstone(key)) {
- VersionTagHolder versionHolder = new VersionTagHolder();
+ VersionTagHolder versionHolder = createVersionTagHolder();
ClientProxyMembershipID id = servConn == null ? null : servConn.getProxyID();
data = region.get(key, null, true, true, true, id, versionHolder, true);
@@ -1475,6 +1473,12 @@ public abstract class BaseCommand implements Command {
sendNewRegisterInterestResponseChunk(region, keyList, values, true, servConn);
}
+ private static VersionTagHolder createVersionTagHolder() {
+ VersionTagHolder versionHolder = new VersionTagHolder();
+ versionHolder.setOperation(Operation.GET_FOR_REGISTER_INTEREST);
+ return versionHolder;
+ }
+
/**
* Append an interest response
*
http://git-wip-us.apache.org/repos/asf/geode/blob/1a36d36e/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListDUnitTest.java
index e5fb5fc..d8164f1 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListDUnitTest.java
@@ -479,6 +479,80 @@ public class InterestListDUnitTest extends JUnit4DistributedTestCase {
vm1.invoke(() -> InterestListDUnitTest.confirmNoCacheListenerInvalidates());
}
+ @Test
+ public void testRegisterInterestSingleKeyWithDestroyOnReplicatedRegionWithCacheLoader() {
+ List keysToDestroy = new ArrayList();
+ keysToDestroy.add("0");
+ runRegisterInterestWithDestroyAndCacheLoaderTest(true, keysToDestroy, keysToDestroy);
+ }
+
+ @Test
+ public void testRegisterInterestSingleKeyWithDestroyOnPartitionedRegionWithCacheLoader() {
+ List keysToDestroy = new ArrayList();
+ keysToDestroy.add("0");
+ runRegisterInterestWithDestroyAndCacheLoaderTest(false, keysToDestroy, keysToDestroy);
+ }
+
+ @Test
+ public void testRegisterInterestListOfKeysWithDestroyOnReplicatedRegionWithCacheLoader() {
+ List keysToDestroy = new ArrayList();
+ for (int i = 0; i < 5; i++) {
+ keysToDestroy.add(String.valueOf(i));
+ }
+ runRegisterInterestWithDestroyAndCacheLoaderTest(true, keysToDestroy, keysToDestroy);
+ }
+
+ @Test
+ public void testRegisterInterestListOfKeysWithDestroyOnPartitionedRegionWithCacheLoader() {
+ List keysToDestroy = new ArrayList();
+ for (int i = 0; i < 5; i++) {
+ keysToDestroy.add(String.valueOf(i));
+ }
+ runRegisterInterestWithDestroyAndCacheLoaderTest(false, keysToDestroy, keysToDestroy);
+ }
+
+ @Test
+ public void testRegisterInterestAllKeysWithDestroyOnReplicatedRegionWithCacheLoader() {
+ List keysToDestroy = new ArrayList();
+ keysToDestroy.add("0");
+ runRegisterInterestWithDestroyAndCacheLoaderTest(true, keysToDestroy, "ALL_KEYS");
+ }
+
+ @Test
+ public void testRegisterInterestAllKeysWithDestroyOnPartitionedRegionWithCacheLoader() {
+ List keysToDestroy = new ArrayList();
+ keysToDestroy.add("0");
+ runRegisterInterestWithDestroyAndCacheLoaderTest(false, keysToDestroy, "ALL_KEYS");
+ }
+
+ private void runRegisterInterestWithDestroyAndCacheLoaderTest(boolean addReplicatedRegion,
+ List keysToDestroy, Object keyToRegister) {
+ // The server was already started with a replicated region. Bounce it if necessary
+ int port1 = PORT1;
+ if (!addReplicatedRegion) {
+ vm0.invoke(() -> closeCache());
+ port1 =
+ ((Integer) vm0.invoke(() -> InterestListDUnitTest.createServerCache(addReplicatedRegion)))
+ .intValue();
+ }
+ final int port = port1;
+
+ // Add a cache loader to the region
+ vm0.invoke(() -> addCacheLoader());
+
+ // Create client cache
+ vm1.invoke(() -> createClientCache(NetworkUtils.getServerHostName(vm0.getHost()), port));
+
+ // Destroy appropriate key(s)
+ vm1.invoke(() -> destroyKeys(keysToDestroy));
+
+ // Register interest in appropriate keys(s)
+ vm1.invoke(() -> registerKey(keyToRegister));
+
+ // Verify CacheLoader was not invoked
+ vm0.invoke(() -> verifyNoCacheLoaderLoads());
+ }
+
private void createCache(Properties props) throws Exception {
DistributedSystem ds = getSystem(props);
cache = CacheFactory.create(ds);
@@ -905,6 +979,20 @@ public class InterestListDUnitTest extends JUnit4DistributedTestCase {
}
}
+ private static void destroyKeys(List keys) {
+ Region r = cache.getRegion(REGION_NAME);
+ for (Object key : keys) {
+ r.destroy(key);
+ }
+ }
+
+ private static void verifyNoCacheLoaderLoads() throws Exception {
+ Region region = cache.getRegion(REGION_NAME);
+ ReturnKeyCacheLoader cacheLoader =
+ (ReturnKeyCacheLoader) region.getAttributes().getCacheLoader();
+ assertEquals(0/* expected */, cacheLoader.getLoads()/* actual */);
+ }
+
private static void validateEntriesK1andK2(final String vm) {
WaitCriterion ev = new WaitCriterion() {
@Override
@@ -1076,6 +1164,8 @@ public class InterestListDUnitTest extends JUnit4DistributedTestCase {
private static class ReturnKeyCacheLoader implements CacheLoader {
+ private AtomicInteger loads = new AtomicInteger();
+
@Override
public void close() {
// Do nothing
@@ -1083,7 +1173,16 @@ public class InterestListDUnitTest extends JUnit4DistributedTestCase {
@Override
public Object load(LoaderHelper helper) throws CacheLoaderException {
+ incrementLoads();
return helper.getKey();
}
+
+ private void incrementLoads() {
+ this.loads.incrementAndGet();
+ }
+
+ private int getLoads() {
+ return this.loads.get();
+ }
}
}