You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/01/21 01:45:15 UTC
[geode] 01/01: GEODE-3967: when
ConcurrentCacheModificationException happened. GatewaySenderEventImpl
should save the status and notify gatewaysender anyway. SerialGatewaySender
will handle it. In AbstractUpdateOperation's doPutOrCreate's 3 tries of
basicUpdate, the 3rd one should allow both create and update.
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch feature/GEODE-3967
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 68054778c8030a88281cada6a693e72b935c9144
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Sat Jan 20 17:39:00 2018 -0800
GEODE-3967: when ConcurrentCacheModificationException happened. GatewaySenderEventImpl should save the status and notify gatewaysender anyway. SerialGatewaySender will handle it. In AbstractUpdateOperation's doPutOrCreate's 3 tries of basicUpdate, the 3rd one should allow both create and update.
---
.../apache/geode/internal/cache/AbstractRegionMap.java | 3 +++
.../geode/internal/cache/AbstractUpdateOperation.java | 2 +-
.../org/apache/geode/internal/cache/LocalRegion.java | 18 +++++++++++++-----
.../internal/cache/wan/GatewaySenderEventImpl.java | 6 +++++-
.../wan/serial/SerialGatewaySenderEventProcessor.java | 9 ++++++---
5 files changed, 28 insertions(+), 10 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 487c35a..1fe9c00 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -1185,6 +1185,7 @@ public abstract class AbstractRegionMap implements RegionMap {
true/* conflict with clear */, duringRI, true);
doPart3 = true;
} catch (ConcurrentCacheModificationException ccme) {
+ event.isConcurrencyConflict(true);
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
// Notify gateways of new time-stamp.
@@ -2096,6 +2097,7 @@ public abstract class AbstractRegionMap implements RegionMap {
}
} // !opCompleted
} catch (ConcurrentCacheModificationException ccme) {
+ event.isConcurrencyConflict(true);
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
// Notify gateways of new time-stamp.
@@ -2854,6 +2856,7 @@ public abstract class AbstractRegionMap implements RegionMap {
clearOccured = true;
owner.recordEvent(event);
} catch (ConcurrentCacheModificationException ccme) {
+ event.isConcurrencyConflict(true);
VersionTag tag = event.getVersionTag();
if (tag != null && tag.isTimeStampUpdated()) {
// Notify gateways of new time-stamp.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
index 585e131..a706abd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
@@ -175,7 +175,7 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
|| (rgn.getDataPolicy().withReplication() && rgn.getConcurrencyChecksEnabled())) {
overwriteDestroyed = true;
ev.makeCreate();
- rgn.basicUpdate(ev, true /* ifNew */, false/* ifOld */, lastMod,
+ rgn.basicUpdate(ev, false /* ifNew */, false/* ifOld */, lastMod,
overwriteDestroyed);
rgn.getCachePerfStats().endPut(startPut, ev.isOriginRemote());
updated = true;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index aca96d0..4a9ff44 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -2840,6 +2840,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
logger.debug("caught concurrent modification attempt when applying {}", event);
}
notifyBridgeClients(event);
+ notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE
+ : EnumListenerEvent.AFTER_CREATE, event);
}
if (!getDataView().isDeferredStats()) {
getCachePerfStats().endPut(startPut, event.isOriginRemote());
@@ -5618,6 +5620,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
logger.debug("caught concurrent modification attempt when applying {}", event);
}
notifyBridgeClients(event);
+ notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE
+ : EnumListenerEvent.AFTER_CREATE, event);
return false;
}
@@ -5844,6 +5848,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
updateTimeStampEvent.setGenerateCallbacks(false);
updateTimeStampEvent.distributedMember = event.getDistributedMember();
updateTimeStampEvent.setNewEventId(getSystem());
+ if (event.isConcurrencyConflict()) {
+ updateTimeStampEvent.isConcurrencyConflict(true);
+ }
if (event.getRegion() instanceof BucketRegion) {
BucketRegion bucketRegion = (BucketRegion) event.getRegion();
@@ -6100,8 +6107,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
- if (isPdxTypesRegion() || event.isConcurrencyConflict()) {
- // isConcurrencyConflict is usually a concurrent cache modification problem
+ if (isPdxTypesRegion()) {
return;
}
@@ -6125,9 +6131,10 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
if (allRemoteDSIds != null) {
for (GatewaySender sender : getCache().getAllGatewaySenders()) {
if (allGatewaySenderIds.contains(sender.getId())) {
- // TODO: This is a BUG. Why return and not continue?
- if (!this.getDataPolicy().withStorage() && sender.isParallel()) {
- return;
+ // if isConcurrencyConflict is true, only notify serial gateway sender
+ if ((!this.getDataPolicy().withStorage() || event.isConcurrencyConflict())
+ && sender.isParallel()) {
+ continue;
}
if (logger.isDebugEnabled()) {
logger.debug("Notifying the GatewaySender : {}", sender.getId());
@@ -6485,6 +6492,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
// Notify clients only if its NOT a gateway event.
if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) {
notifyBridgeClients(event);
+ notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event);
}
return true; // event was elided
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 2748c7d..e744afc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -171,6 +171,8 @@ public class GatewaySenderEventImpl
protected boolean isInitialized;
+ public boolean isConcurrencyConflict = false;
+
/**
* Is this thread in the process of serializing this event?
*/
@@ -312,6 +314,7 @@ public class GatewaySenderEventImpl
if (initialize) {
initialize();
}
+ this.isConcurrencyConflict = event.isConcurrencyConflict();
}
/**
@@ -744,7 +747,8 @@ public class GatewaySenderEventImpl
.append(";creationTime=").append(this.creationTime).append(";shadowKey= ")
.append(this.shadowKey).append(";timeStamp=").append(this.versionTimeStamp)
.append(";acked=").append(this.isAcked).append(";dispatched=").append(this.isDispatched)
- .append(";bucketId=").append(this.bucketId).append("]");
+ .append(";bucketId=").append(this.bucketId).append(";isConcurrencyConflict=")
+ .append(this.isConcurrencyConflict).append("]");
return buffer.toString();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index 734b560..d5d0baa 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -35,6 +35,7 @@ import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.wan.GatewayQueueEvent;
@@ -423,9 +424,11 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
} else {
// If it is not, create an uninitialized GatewayEventImpl and
// put it into the map of unprocessed events.
- senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue, false); // OFFHEAP
- // ok
- handleSecondaryEvent(senderEvent);
+ if (!event.getOperation().equals(Operation.UPDATE_VERSION_STAMP)) {
+ senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue, false); // OFFHEAP
+ // ok
+ handleSecondaryEvent(senderEvent);
+ }
}
}
}
--
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.