You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/09/26 21:49:11 UTC
[geode] 01/01: GEM-2228-883: The previous fix for GEODE-3967 caused
an event with CME is dispatched. This has a side effect of data
inconsistency of WAN. The enhanced fix is to enqueue the CME event but not
to dispatch it.
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch feature/GEM-2228-883
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 9cb0131de13bc6a4987875f5a5efbbb05e73d988
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Wed Sep 26 14:45:22 2018 -0700
GEM-2228-883: The previous fix for GEODE-3967 caused an event with CME
is dispatched. This has a side effect of data inconsistency of WAN.
The enhanced fix is to enqueue the CME event but not to dispatch it.
---
.../codeAnalysis/sanctionedDataSerializables.txt | 8 ++++--
.../wan/AbstractGatewaySenderEventProcessor.java | 14 ++++++++++
.../internal/cache/wan/GatewaySenderEventImpl.java | 32 ++++++++++++++++++++--
3 files changed, 48 insertions(+), 6 deletions(-)
diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index f5b8733..c599221 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1984,9 +1984,11 @@ org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackArgument,2
fromData,63
toData,87
-org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,2
-fromData,183
-toData,133
+org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,4
+fromData,17
+fromDataPre_GEODE_1_8_0_0,183
+toData,17
+toDataPre_GEODE_1_8_0_0,133
org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation$GatewaySenderQueueEntrySynchronizationEntry,2
fromData,20
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index ea2f603..71cdac2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -562,6 +562,20 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread
}
}
}
+
+ // filter out the events with CME
+ Iterator<GatewaySenderEventImpl> cmeItr = filteredList.iterator();
+ while (cmeItr.hasNext()) {
+ GatewaySenderEventImpl event = cmeItr.next();
+ if (event.isConcurrencyConflict()) {
+ cmeItr.remove();
+ logger.debug("The CME event: {} is removed from Gateway Sender queue: {}", event,
+ sender);
+ statistics.incEventsNotQueued();
+ continue;
+ }
+ }
+
/*
* if (filteredList.isEmpty()) { eventQueueRemove(events.size()); continue; }
*/
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 7c74957..6ac2275 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -33,6 +33,7 @@ import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.VersionedDataInputStream;
+import org.apache.geode.internal.VersionedDataSerializable;
import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.CachedDeserializableFactory;
import org.apache.geode.internal.cache.Conflatable;
@@ -62,7 +63,8 @@ import org.apache.geode.internal.size.Sizeable;
*
*/
public class GatewaySenderEventImpl
- implements AsyncEvent, DataSerializableFixedID, Conflatable, Sizeable, Releasable {
+ implements AsyncEvent, DataSerializableFixedID, Conflatable, Sizeable, Releasable,
+ VersionedDataSerializable {
private static final long serialVersionUID = -5690172020872255422L;
protected static final Object TOKEN_NULL = new Object();
@@ -171,6 +173,8 @@ public class GatewaySenderEventImpl
protected boolean isInitialized;
+ private transient boolean isConcurrencyConflict = false;
+
/**
* Is this thread in the process of serializing this event?
*/
@@ -310,6 +314,7 @@ public class GatewaySenderEventImpl
if (initialize) {
initialize();
}
+ this.isConcurrencyConflict = event.isConcurrencyConflict();
}
/**
@@ -671,7 +676,13 @@ public class GatewaySenderEventImpl
return GATEWAY_SENDER_EVENT_IMPL;
}
+ @Override
public void toData(DataOutput out) throws IOException {
+ toDataPre_GEODE_1_8_0_0(out);
+ DataSerializer.writeBoolean(this.isConcurrencyConflict, out);
+ }
+
+ public void toDataPre_GEODE_1_8_0_0(DataOutput out) throws IOException {
// Make sure we are initialized before we serialize.
initialize();
out.writeShort(VERSION);
@@ -695,7 +706,13 @@ public class GatewaySenderEventImpl
DataSerializer.writeObject(this.key, out);
}
+ @Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ fromDataPre_GEODE_1_8_0_0(in);
+ this.isConcurrencyConflict = DataSerializer.readBoolean(in);
+ }
+
+ public void fromDataPre_GEODE_1_8_0_0(DataInput in) throws IOException, ClassNotFoundException {
short version = in.readShort();
if (version != VERSION) {
// warning?`
@@ -742,7 +759,8 @@ public class GatewaySenderEventImpl
.append(";creationTime=").append(this.creationTime).append(";shadowKey=")
.append(this.shadowKey).append(";timeStamp=").append(this.versionTimeStamp)
.append(";acked=").append(this.isAcked).append(";dispatched=").append(this.isDispatched)
- .append(";bucketId=").append(this.bucketId).append("]");
+ .append(";bucketId=").append(this.bucketId).append(";isConcurrencyConflict=")
+ .append(this.isConcurrencyConflict).append("]");
return buffer.toString();
}
@@ -1125,6 +1143,14 @@ public class GatewaySenderEventImpl
return bucketId;
}
+ public boolean isConcurrencyConflict() {
+ return isConcurrencyConflict;
+ }
+
+ public boolean setConcurrencyConflict(boolean isConcurrencyConflict) {
+ return this.isConcurrencyConflict = isConcurrencyConflict;
+ }
+
/**
* @param tailKey the tailKey to set
*/
@@ -1141,7 +1167,7 @@ public class GatewaySenderEventImpl
@Override
public Version[] getSerializationVersions() {
- return null;
+ return new Version[] {Version.GEODE_180};
}
public int getSerializedValueSize() {