You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/03/31 00:21:59 UTC
incubator-geode git commit: add FORCE_INVALIDATE_EVENT fixes to geode
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-915 [created] fdc61fc7e
add FORCE_INVALIDATE_EVENT fixes to geode
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/fdc61fc7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/fdc61fc7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/fdc61fc7
Branch: refs/heads/feature/GEODE-915
Commit: fdc61fc7e0c540b77dcf38d3813cdf7a3036d9d2
Parents: 82faa8a
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Wed Mar 30 15:10:29 2016 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Wed Mar 30 15:10:29 2016 -0700
----------------------------------------------------------------------
.../internal/cache/AbstractRegionMap.java | 66 +--
.../gemfire/internal/cache/EntryEventImpl.java | 3 +
.../gemfire/internal/cache/LocalRegion.java | 14 +-
.../gemfire/internal/cache/ProxyRegionMap.java | 11 +-
.../ClientServerForceInvalidateDUnitTest.java | 414 +++++++++++++++++++
5 files changed, 463 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fdc61fc7/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index 9058984..58f535d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -1614,7 +1614,7 @@ public abstract class AbstractRegionMap implements RegionMap {
// Create an entry event only if the calling context is
// a receipt of a TXCommitMessage AND there are callbacks installed
// for this region
- boolean invokeCallbacks = shouldCreateCBEvent(owner, false/*isInvalidate*/, isRegionReady || inRI);
+ boolean invokeCallbacks = shouldCreateCBEvent(owner, isRegionReady || inRI);
EntryEventImpl cbEvent = createCBEvent(owner, op,
key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
try {
@@ -1726,7 +1726,7 @@ public abstract class AbstractRegionMap implements RegionMap {
}
else {
try {
- boolean invokeCallbacks = shouldCreateCBEvent(owner, false, isRegionReady || inRI);
+ boolean invokeCallbacks = shouldCreateCBEvent(owner, isRegionReady || inRI);
cbEvent = createCBEvent(owner, op,
key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
try {
@@ -1788,7 +1788,7 @@ public abstract class AbstractRegionMap implements RegionMap {
if (!opCompleted) {
// already has value set to Token.DESTROYED
opCompleted = true;
- boolean invokeCallbacks = shouldCreateCBEvent(owner, false, isRegionReady || inRI);
+ boolean invokeCallbacks = shouldCreateCBEvent(owner, isRegionReady || inRI);
cbEvent = createCBEvent(owner, op,
key, null, txId, txEvent, eventId, aCallbackArgument, filterRoutingInfo, bridgeContext, txEntryState, versionTag, tailKey);
try {
@@ -1878,6 +1878,14 @@ public abstract class AbstractRegionMap implements RegionMap {
* If true then invalidates that throw EntryNotFoundException
* or that are already invalid will first call afterInvalidate on CacheListeners.
* The old value on the event passed to afterInvalidate will be null.
+ * If the region is not initialized then callbacks will not be done.
+ * This property only applies to non-transactional invalidates.
+ * Transactional invalidates ignore this property.
+ * Note that empty "proxy" regions on a client will not be sent invalidates
+ * from the server unless they also set the proxy InterestPolicy to ALL.
+ * If the invalidate is not sent then this property will not cause a listener
+ * on that client to be notified of the invalidate.
+ * A non-empty "caching-proxy" will receive invalidates from the server.
*/
public static boolean FORCE_INVALIDATE_EVENT = Boolean.getBoolean("gemfire.FORCE_INVALIDATE_EVENT");
@@ -1885,9 +1893,9 @@ public abstract class AbstractRegionMap implements RegionMap {
* If the FORCE_INVALIDATE_EVENT flag is true
* then invoke callbacks on the given event.
*/
- void forceInvalidateEvent(EntryEventImpl event) {
+ static void forceInvalidateEvent(EntryEventImpl event, LocalRegion owner) {
if (FORCE_INVALIDATE_EVENT) {
- event.invokeCallbacks(_getOwner(), false, false);
+ event.invokeCallbacks(owner, false, false);
}
}
@@ -1907,8 +1915,9 @@ public abstract class AbstractRegionMap implements RegionMap {
boolean didInvalidate = false;
RegionEntry invalidatedRe = null;
boolean clearOccured = false;
-
DiskRegion dr = owner.getDiskRegion();
+ boolean ownerIsInitialized = owner.isInitialized();
+ try {
// Fix for Bug #44431. We do NOT want to update the region and wait
// later for index INIT as region.clear() can cause inconsistency if
// happened in parallel as it also does index INIT.
@@ -1957,9 +1966,8 @@ public abstract class AbstractRegionMap implements RegionMap {
// that's okay - when writing an invalid into a disk, the
// region has been cleared (including this token)
}
- forceInvalidateEvent(event);
} else {
- owner.cacheWriteBeforeInvalidate(event, invokeCallbacks, forceNewEntry);
+ owner.serverInvalidate(event);
if (owner.concurrencyChecksEnabled && event.noVersionReceivedFromServer()) {
// server did not perform the invalidation, so don't leave an invalid
// entry here
@@ -2031,17 +2039,20 @@ public abstract class AbstractRegionMap implements RegionMap {
if (forceNewEntry && event.isFromServer()) {
// don't invoke listeners - we didn't force new entries for
// CCU invalidations before 7.0, and listeners don't care
- event.inhibitCacheListenerNotification(true);
+ if (!FORCE_INVALIDATE_EVENT) {
+ event.inhibitCacheListenerNotification(true);
+ }
}
event.setRegionEntry(newRe);
- owner.cacheWriteBeforeInvalidate(event, invokeCallbacks, forceNewEntry);
+ owner.serverInvalidate(event);
if (!forceNewEntry && event.noVersionReceivedFromServer()) {
// server did not perform the invalidation, so don't leave an invalid
// entry here
return false;
}
try {
- if (!owner.isInitialized() && owner.getDataPolicy().withReplication()) {
+ ownerIsInitialized = owner.isInitialized();
+ if (!ownerIsInitialized && owner.getDataPolicy().withReplication()) {
final int oldSize = owner.calculateRegionEntryValueSize(newRe);
invalidateEntry(event, newRe, oldSize);
}
@@ -2101,7 +2112,8 @@ public abstract class AbstractRegionMap implements RegionMap {
re = null;
}
if (re == null) {
- if (!owner.isInitialized()) {
+ ownerIsInitialized = owner.isInitialized();
+ if (!ownerIsInitialized) {
// when GII message arrived or processed later than invalidate
// message, the entry should be created as placeholder
RegionEntry newRe = haveTombstone? tombstone : getEntryFactory().createEntry(owner, event.getKey(),
@@ -2129,7 +2141,7 @@ public abstract class AbstractRegionMap implements RegionMap {
}
// bug #43287 - send event to server even if it's not in the client (LRU may have evicted it)
- owner.cacheWriteBeforeInvalidate(event, true, false);
+ owner.serverInvalidate(event);
if (owner.concurrencyChecksEnabled) {
if (event.getVersionTag() == null) {
// server did not perform the invalidation, so don't leave an invalid
@@ -2186,11 +2198,10 @@ public abstract class AbstractRegionMap implements RegionMap {
if (event.getVersionTag() != null && owner.getVersionVector() != null) {
owner.getVersionVector().recordVersion((InternalDistributedMember) event.getDistributedMember(), event.getVersionTag());
}
- forceInvalidateEvent(event);
}
else { // previous value not invalid
event.setRegionEntry(re);
- owner.cacheWriteBeforeInvalidate(event, invokeCallbacks, forceNewEntry);
+ owner.serverInvalidate(event);
if (owner.concurrencyChecksEnabled && event.noVersionReceivedFromServer()) {
// server did not perform the invalidation, so don't leave an invalid
// entry here
@@ -2253,7 +2264,6 @@ public abstract class AbstractRegionMap implements RegionMap {
// is in region, do nothing
}
if (!entryExisted) {
- forceInvalidateEvent(event);
owner.checkEntryNotFound(event.getKey());
}
} // while(retry)
@@ -2284,6 +2294,11 @@ public abstract class AbstractRegionMap implements RegionMap {
}
}
return didInvalidate;
+ } finally {
+ if (ownerIsInitialized) {
+ forceInvalidateEvent(event, owner);
+ }
+ }
}
protected void invalidateNewEntry(EntryEventImpl event,
@@ -2410,7 +2425,7 @@ public abstract class AbstractRegionMap implements RegionMap {
// a receipt of a TXCommitMessage AND there are callbacks
// installed
// for this region
- boolean invokeCallbacks = shouldCreateCBEvent(owner, true, owner.isInitialized());
+ boolean invokeCallbacks = shouldCreateCBEvent(owner, owner.isInitialized());
boolean cbEventInPending = false;
cbEvent = createCBEvent(owner,
localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
@@ -2474,7 +2489,7 @@ public abstract class AbstractRegionMap implements RegionMap {
}
}
if (!opCompleted) {
- boolean invokeCallbacks = shouldCreateCBEvent( owner, true /* isInvalidate */, owner.isInitialized());
+ boolean invokeCallbacks = shouldCreateCBEvent( owner, owner.isInitialized());
boolean cbEventInPending = false;
cbEvent = createCBEvent(owner,
localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
@@ -2539,7 +2554,7 @@ public abstract class AbstractRegionMap implements RegionMap {
// a receipt of a TXCommitMessage AND there are callbacks
// installed
// for this region
- boolean invokeCallbacks = shouldCreateCBEvent(owner, true, owner.isInitialized());
+ boolean invokeCallbacks = shouldCreateCBEvent(owner, owner.isInitialized());
boolean cbEventInPending = false;
cbEvent = createCBEvent(owner,
localOp ? Operation.LOCAL_INVALIDATE : Operation.INVALIDATE,
@@ -3316,7 +3331,7 @@ public abstract class AbstractRegionMap implements RegionMap {
final boolean isRegionReady = owner.isInitialized();
EntryEventImpl cbEvent = null;
EntryEventImpl sqlfEvent = null;
- boolean invokeCallbacks = shouldCreateCBEvent(owner, false /*isInvalidate*/, isRegionReady);
+ boolean invokeCallbacks = shouldCreateCBEvent(owner, isRegionReady);
boolean cbEventInPending = false;
cbEvent = createCBEvent(owner, putOp, key, newValue, txId,
txEvent, eventId, aCallbackArgument,filterRoutingInfo,bridgeContext, txEntryState, versionTag, tailKey);
@@ -3739,7 +3754,7 @@ public abstract class AbstractRegionMap implements RegionMap {
}
static boolean shouldCreateCBEvent( final LocalRegion owner,
- final boolean isInvalidate, final boolean isInitialized)
+ final boolean isInitialized)
{
LocalRegion lr = owner;
boolean isPartitioned = lr.isUsedForPartitionedRegionBucket();
@@ -3752,17 +3767,10 @@ public abstract class AbstractRegionMap implements RegionMap {
}*/
lr = owner.getPartitionedRegion();
}
- if (isInvalidate) { // ignore shouldNotifyGatewayHub check for invalidates
- return (isPartitioned || isInitialized)
+ return (isPartitioned || isInitialized)
&& (lr.shouldDispatchListenerEvent()
|| lr.shouldNotifyBridgeClients()
|| lr.getConcurrencyChecksEnabled());
- } else {
- return (isPartitioned || isInitialized)
- && (lr.shouldDispatchListenerEvent()
- || lr.shouldNotifyBridgeClients()
- || lr.getConcurrencyChecksEnabled());
- }
}
/** create a callback event for applying a transactional change to the local cache */
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fdc61fc7/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
index dfd20ef..5935faa 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EntryEventImpl.java
@@ -2338,6 +2338,9 @@ public class EntryEventImpl
if (callbacksInvoked()) {
buf.append(";callbacksInvoked");
}
+ if (inhibitCacheListenerNotification()) {
+ buf.append(";inhibitCacheListenerNotification");
+ }
if (this.versionTag != null) {
buf.append(";version=").append(this.versionTag);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fdc61fc7/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index c727a53..596a583 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -3262,9 +3262,8 @@ public class LocalRegion extends AbstractRegion
/**
* @since 5.7
*/
- protected void serverInvalidate(EntryEventImpl event, boolean invokeCallbacks,
- boolean forceNewEntry) {
- if (event.getOperation().isDistributed()) {
+ void serverInvalidate(EntryEventImpl event) {
+ if (event.getOperation().isDistributed() && !event.isOriginRemote()) {
ServerRegionProxy mySRP = getServerProxy();
if (mySRP != null) {
mySRP.invalidate(event);
@@ -3385,15 +3384,6 @@ public class LocalRegion extends AbstractRegion
}
/**
- * @since 5.7
- */
- void cacheWriteBeforeInvalidate(EntryEventImpl event, boolean invokeCallbacks, boolean forceNewEntry) {
- if (!event.getOperation().isLocal() && !event.isOriginRemote()) {
- serverInvalidate(event, invokeCallbacks, forceNewEntry);
- }
- }
-
- /**
* @see DistributedRegion#cacheWriteBeforePut(EntryEventImpl, Set, CacheWriter, boolean, Object)
* @param event
* @param netWriteRecipients
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fdc61fc7/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
index dc9db46..67e13fc 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/ProxyRegionMap.java
@@ -188,9 +188,12 @@ final class ProxyRegionMap implements RegionMap {
throws EntryNotFoundException {
if (event.getOperation().isLocal()) {
+ if (this.owner.isInitialized()) {
+ AbstractRegionMap.forceInvalidateEvent(event, this.owner);
+ }
throw new EntryNotFoundException(event.getKey().toString());
}
- this.owner.cacheWriteBeforeInvalidate(event, invokeCallbacks, forceNewEntry);
+ this.owner.serverInvalidate(event);
this.owner.recordEvent(event);
this.owner.basicInvalidatePart2(markerEntry, event, false /*Clear conflict occurred */, true);
this.owner.basicInvalidatePart3(markerEntry, event, true);
@@ -275,7 +278,7 @@ final class ProxyRegionMap implements RegionMap {
txEvent.addDestroy(this.owner, markerEntry, key,aCallbackArgument);
}
if (AbstractRegionMap.shouldCreateCBEvent(this.owner,
- false, !inTokenMode)) {
+ !inTokenMode)) {
// fix for bug 39526
EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner, op,
key, null, txId, txEvent, eventId, aCallbackArgument,filterRoutingInfo,bridgeContext, txEntryState, versionTag, tailKey);
@@ -305,7 +308,7 @@ final class ProxyRegionMap implements RegionMap {
txEvent.addInvalidate(this.owner, markerEntry, key, newValue,aCallbackArgument);
}
if (AbstractRegionMap.shouldCreateCBEvent(this.owner,
- true, this.owner.isInitialized())) {
+ this.owner.isInitialized())) {
// fix for bug 39526
boolean cbEventInPending = false;
EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner,
@@ -339,7 +342,7 @@ final class ProxyRegionMap implements RegionMap {
txEvent.addPut(putOp, this.owner, markerEntry, key, newValue,aCallbackArgument);
}
if (AbstractRegionMap.shouldCreateCBEvent(this.owner,
- false, this.owner.isInitialized())) {
+ this.owner.isInitialized())) {
// fix for bug 39526
boolean cbEventInPending = false;
EntryEventImpl e = AbstractRegionMap.createCBEvent(this.owner, putOp, key,
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fdc61fc7/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
new file mode 100644
index 0000000..f7454d5
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.tier.sockets;
+
+import static com.jayway.awaitility.Awaitility.with;
+
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheListener;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.InterestPolicy;
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.SubscriptionAttributes;
+import com.gemstone.gemfire.cache.client.NoAvailableServersException;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.internal.Connection;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.cache.AbstractRegionMap;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverAdapter;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.NetworkUtils;
+import com.gemstone.gemfire.test.dunit.VM;
+
+/**
+ * Tests client server FORCE_INVALIDATE
+ */
+public class ClientServerForceInvalidateDUnitTest extends CacheTestCase
+{
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger logger = LogService.getLogger();
+
+ private static Region<String, String> region1;
+
+ private static final String REGION_NAME1 = "ClientServerForceInvalidateDUnitTest_region1";
+
+ private static Host host;
+
+ private static VM server1;
+ private static VM server2;
+
+ /** constructor */
+ public ClientServerForceInvalidateDUnitTest(String name) {
+ super(name);
+ }
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ host = Host.getHost(0);
+ server1 = host.getVM(0);
+ server2 = host.getVM(1);
+ }
+
+ private int initServerCache(VM vm, boolean concurrencyChecksEnabled, boolean partitioned) {
+ return vm.invoke(() -> createServerCache(concurrencyChecksEnabled, partitioned, 0));
+ }
+
+ public void testForceInvalidateOnCachingProxyWithConcurrencyChecks() throws Exception {
+ dotestForceInvalidate(true, true, false, true);
+ }
+ public void testForceInvalidateOnCachingProxyWithConcurrencyChecksOnlyOnServer() throws Exception {
+ dotestForceInvalidate(true, false, false, true);
+ }
+ public void testForceInvalidateOnCachingProxyWithConcurrencyChecksOnlyOnClient() throws Exception {
+ dotestForceInvalidate(false, true, false, true);
+ }
+ public void testForceInvalidateOnProxyWithConcurrencyChecks() throws Exception {
+ dotestForceInvalidate(true, true, true, true);
+ }
+ public void testForceInvalidateOnProxyWithConcurrencyChecksOnlyOnServer() throws Exception {
+ dotestForceInvalidate(true, false, true, true);
+ }
+ public void testForceInvalidateOnProxyWithConcurrencyChecksOnlyOnClient() throws Exception {
+ dotestForceInvalidate(false, true, true, true);
+ }
+ public void testForceInvalidateOnCachingProxyWithConcurrencyChecksServerReplicated() throws Exception {
+ dotestForceInvalidate(true, true, false, false);
+ }
+ public void testForceInvalidateOnProxyWithConcurrencyChecksServerReplicated() throws Exception {
+ dotestForceInvalidate(true, true, true, false);
+ }
+
+ /**
+ * 1. create an entry
+ * 2. Install a observer to pause sending subscription events to the client
+ * 3. invalidate the entry from the server (it will be done on server but pause
+ * prevents it from being sent to the client).
+ * 4. verify that afterInvalidate was invoked on the server.
+ * 5. change the same entry (do a put). Both the client and server now have the
+ * latest version which is this update.
+ * 6. unpause the observer so that it now sends invalidate event to client.
+ * It will arrive late and not be done because of concurrency checks.
+ * 7. verify that afterInvalidate was invoked on the client.
+ */
+ public void testInvalidateLosingOnConcurrencyChecks() throws Exception {
+ try {
+ setupServerAndClientVMs(true, true, false, false);
+ final String key = "delayInvalidate";
+ region1.registerInterest("ALL_KEYS", InterestResultPolicy.NONE, false, false);
+ region1.put(key, "1000");
+ logger.info("installing observers");
+ server1.invoke(() -> installObserver());
+ server2.invoke(() -> installObserver());
+
+ server2.invoke(() -> invalidateOnServer(key));
+
+ validateServerListenerInvoked();
+
+ logger.info("putting a new value 1001");
+ region1.put(key, "1001");
+ logger.info("UnPausing observers");
+ server1.invoke(() -> unpauseObserver());
+ server2.invoke(() -> unpauseObserver());
+
+ waitForClientInvalidate();
+
+ } finally {
+ server1.invoke(() -> cleanupObserver());
+ server2.invoke(() -> cleanupObserver());
+ }
+ }
+
+ private static void installObserver() {
+ CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = true;
+ ClientServerObserverHolder.setInstance(new DelaySendingEvent());
+ }
+
+ private static void unpauseObserver() {
+ DelaySendingEvent observer = (DelaySendingEvent) ClientServerObserverHolder.getInstance();
+ observer.latch.countDown();
+ }
+
+ private static void cleanupObserver() {
+ CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = false;
+ ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter());
+ }
+
+ private static void invalidateOnServer(final Object key) {
+ Region<?,?> r = GemFireCacheImpl.getExisting().getRegion(REGION_NAME1);
+ r.invalidate(key);
+ }
+ private static void createOnServer(final Object key, final Object value) {
+ @SuppressWarnings("unchecked")
+ Region<Object, Object> r = GemFireCacheImpl.getExisting().getRegion(REGION_NAME1);
+ r.create(key, value);
+ }
+
+ private void waitForClientInvalidate() {
+ with().pollInterval(10, TimeUnit.MILLISECONDS).await().atMost(20, TimeUnit.SECONDS)
+ .until(() -> hasClientListenerAfterInvalidateBeenInvoked());
+ }
+
+ static class DelaySendingEvent extends ClientServerObserverAdapter {
+ CountDownLatch latch = new CountDownLatch(1);
+ @Override
+ public void afterMessageCreation(Message msg) {
+ try {
+ logger.info("waiting in DelaySendingEvent...");
+ latch.await();
+ logger.info("finished waiting in DelaySendingEvent");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ /**
+ * 1. Invalidate a non-existent entry from the server.
+ * 2. Validate that the servers see after invalidate.
+ * 3. Validate that the subscribed client invokes after invalidate.
+ */
+ private void dotestForceInvalidate(boolean concurrencyChecksOnServer, boolean concurrencyChecksOnClient, boolean clientEmpty, boolean serverPartitioned) throws Exception {
+ setupServerAndClientVMs(concurrencyChecksOnServer, concurrencyChecksOnClient, clientEmpty, serverPartitioned);
+
+ server2.invoke(() -> createOnServer("key", "value"));
+ region1.registerInterest("ALL_KEYS", InterestResultPolicy.NONE, false, false);
+ server2.invoke(() -> invalidateOnServer("key"));
+
+ validateServerListenerInvoked();
+ waitForClientInvalidate();
+ }
+
+ private void setupServerAndClientVMs(boolean concurrencyChecksOnServer, boolean concurrencyChecksOnClient, boolean clientEmpty, boolean serverPartitioned) throws Exception {
+ int port1 = initServerCache(server1, concurrencyChecksOnServer, serverPartitioned); // vm0
+ int port2 = initServerCache(server2, concurrencyChecksOnServer, serverPartitioned); // vm1
+ String serverName = NetworkUtils.getServerHostName(Host.getHost(0));
+ createClientCache(serverName, port1, port2, clientEmpty, concurrencyChecksOnClient);
+ logger.info("testing force invalidate on on client");
+ }
+
+ private void validateServerListenerInvoked() {
+ boolean listenerInvoked = server1.invoke(() -> validateOnServer())
+ || server2.invoke(() -> validateOnServer());
+ assertTrue(listenerInvoked);
+ }
+
+ private static boolean validateOnServer() {
+ Region<?,?> region = GemFireCacheImpl.getExisting().getRegion(REGION_NAME1);
+ CacheListener<?,?>[] listeners = region.getAttributes().getCacheListeners();
+ for (CacheListener<?,?> listener : listeners) {
+ if (listener instanceof ServerListener) {
+ ServerListener serverListener = (ServerListener) listener;
+ if (serverListener.afterInvalidateInvoked) {
+ return true;
+ }
+ }
+ }
+ return false;
+
+ }
+
+ private boolean hasClientListenerAfterInvalidateBeenInvoked() {
+ Region<?,?> region = getCache().getRegion(REGION_NAME1);
+ CacheListener<?,?>[] listeners = region.getAttributes().getCacheListeners();
+ for (CacheListener<?,?> listener : listeners) {
+ if (listener instanceof ClientListener) {
+ ClientListener clientListener = (ClientListener) listener;
+ if (clientListener.afterInvalidateInvoked) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ private static Integer createServerCache(Boolean concurrencyChecksEnabled, Boolean partitioned, Integer maxThreads)
+ throws Exception {
+ AbstractRegionMap.FORCE_INVALIDATE_EVENT = true;
+ Properties props = new Properties();
+ Cache cache = new ClientServerForceInvalidateDUnitTest("temp").createCacheV(props);
+ RegionFactory<String, String> factory = cache.createRegionFactory();
+ if (partitioned) {
+ factory.setDataPolicy(DataPolicy.PARTITION);
+ factory.setPartitionAttributes(new PartitionAttributesFactory<String, String>()
+ .setRedundantCopies(0)
+ .setTotalNumBuckets(251)
+ .create());
+ } else {
+ factory.setDataPolicy(DataPolicy.REPLICATE);
+ }
+ factory.setConcurrencyChecksEnabled(concurrencyChecksEnabled);
+ factory.addCacheListener(new ServerListener());
+ Region<String, String> r1 = factory.create(REGION_NAME1);
+ assertNotNull(r1);
+
+ CacheServer server = cache.addCacheServer();
+ int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ logger.info("Starting server on port " + port);
+ server.setPort(port);
+ server.setMaxThreads(maxThreads.intValue());
+ server.start();
+ logger.info("Started server on port " + server.getPort());
+ return new Integer(server.getPort());
+
+ }
+
+ public static void createClientCache(String h, int port1, int port2, boolean empty, boolean concurrenctChecksEnabled)
+ throws Exception {
+ AbstractRegionMap.FORCE_INVALIDATE_EVENT = true;
+ Properties props = new Properties();
+ props.setProperty("mcast-port", "0");
+ props.setProperty("locators", "");
+ Cache cache = new ClientServerForceInvalidateDUnitTest("temp").createCacheV(props);
+ PoolImpl p = (PoolImpl)PoolManager.createFactory()
+ .addServer(h, port1)
+ .addServer(h, port2)
+ .setSubscriptionEnabled(true)
+ .setThreadLocalConnections(true)
+ .setReadTimeout(1000)
+ .setSocketBufferSize(32768)
+ .setMinConnections(3)
+ .setSubscriptionRedundancy(-1)
+ .setPingInterval(2000)
+ .create("ClientServerForceInvalidate2DUnitTestPool");
+
+ RegionFactory<String, String> factory = cache.createRegionFactory();
+ if (empty) {
+ factory.setDataPolicy(DataPolicy.EMPTY);
+ factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
+ } else {
+ factory.setDataPolicy(DataPolicy.NORMAL);
+ }
+ factory.setPoolName(p.getName());
+ factory.setConcurrencyChecksEnabled(concurrenctChecksEnabled);
+ region1 = factory.create(REGION_NAME1);
+ region1.registerInterest("ALL_KEYS", InterestResultPolicy.NONE, false, false);
+ region1.getAttributesMutator().addCacheListener(new ClientListener());
+ assertNotNull(region1);
+ with().pollDelay(1, TimeUnit.MILLISECONDS).pollInterval(1, TimeUnit.SECONDS).await().atMost(60, TimeUnit.SECONDS)
+ .until(() -> poolReady(p));
+ }
+
+ private static boolean poolReady(final PoolImpl pool) {
+ try {
+ Connection conn = pool.acquireConnection();
+ if (conn == null) {
+ //excuse = "acquireConnection returned null?";
+ return false;
+ }
+ return true;
+ } catch (NoAvailableServersException e) {
+ //excuse = "Cannot find a server: " + e;
+ return false;
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ private Cache createCacheV(Properties props) throws Exception
+ {
+ DistributedSystem ds = getSystem(props);
+ assertNotNull(ds);
+ ds.disconnect();
+ ds = getSystem(props);
+ Cache cache = getCache();
+ assertNotNull(cache);
+ return cache;
+ }
+
+ static class ClientListener extends CacheListenerAdapter<String, String> {
+ public boolean afterInvalidateInvoked;
+ @Override
+ public void afterCreate(EntryEvent<String, String> event) {
+ super.afterCreate(event);
+ logger.info("afterCreate: {" + event.getOldValue() + " -> " + event.getNewValue() + "} at=" + System.currentTimeMillis());
+ }
+ @Override
+ public void afterUpdate(EntryEvent<String, String> event) {
+ super.afterUpdate(event);
+ logger.info("afterUpdate: {" + event.getOldValue() + " -> " + event.getNewValue() + "} at=" + System.currentTimeMillis());
+ }
+ @Override
+ public void afterInvalidate(final EntryEvent<String, String> event) {
+ super.afterInvalidate(event);
+ afterInvalidateInvoked = true;
+ String prefix = "";
+ if (!event.isOriginRemote()) {
+ prefix = " ";
+ }
+ logger.info(prefix + "afterInvalidate: {" + event.getOldValue() + " -> " + event.getNewValue() + "} at=" + System.currentTimeMillis());
+
+ }
+ }
+ static class ServerListener extends CacheListenerAdapter<String, String> {
+ boolean afterInvalidateInvoked;
+ @Override
+ public void afterCreate(EntryEvent<String, String> event) {
+ super.afterCreate(event);
+ logger.info("afterCreate: {" + event.getOldValue() + " -> " + event.getNewValue() + "} at=" + System.currentTimeMillis());
+ }
+ @Override
+ public void afterUpdate(EntryEvent<String, String> event) {
+ super.afterUpdate(event);
+ logger.info("afterUpdate: {" + event.getOldValue() + " -> " + event.getNewValue() + "} at=" + System.currentTimeMillis());
+ }
+ @Override
+ public void afterInvalidate(EntryEvent<String, String> event) {
+ super.afterInvalidate(event);
+ afterInvalidateInvoked = true;
+ logger.info("afterInvalidate: {" + event.getOldValue() + " -> " + event.getNewValue() + "} at=" + System.currentTimeMillis());
+ }
+ }
+
+ @Override
+ protected final void postTearDownCacheTestCase() throws Exception {
+ // close the clients first
+ closeCache();
+ // then close the servers
+ server1.invoke(() -> closeCache());
+ server2.invoke(() -> closeCache());
+ }
+
+ @SuppressWarnings("deprecation")
+ public static void closeCache()
+ {
+ AbstractRegionMap.FORCE_INVALIDATE_EVENT = false;
+ Cache cache = new ClientServerForceInvalidateDUnitTest("temp").getCache();
+ if (cache != null && !cache.isClosed()) {
+ cache.close();
+ cache.getDistributedSystem().disconnect();
+ }
+ }
+
+}