You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/09/26 21:49:11 UTC

[geode] 01/01: GEM-2228-883: The previous fix for GEODE-3967 caused an event with CME is dispatched. This has a side effect of data inconsistency of WAN. The enhanced fix is to enqueue the CME event but not to dispatch it.

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEM-2228-883
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 9cb0131de13bc6a4987875f5a5efbbb05e73d988
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Wed Sep 26 14:45:22 2018 -0700

    GEM-2228-883: The previous fix for GEODE-3967 caused an event with CME
    is dispatched. This has a side effect of data inconsistency of WAN.
    The enhanced fix is to enqueue the CME event but not to dispatch it.
---
 .../codeAnalysis/sanctionedDataSerializables.txt   |  8 ++++--
 .../wan/AbstractGatewaySenderEventProcessor.java   | 14 ++++++++++
 .../internal/cache/wan/GatewaySenderEventImpl.java | 32 ++++++++++++++++++++--
 3 files changed, 48 insertions(+), 6 deletions(-)

diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index f5b8733..c599221 100644
--- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1984,9 +1984,11 @@ org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackArgument,2
 fromData,63
 toData,87
 
-org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,2
-fromData,183
-toData,133
+org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,4
+fromData,17
+fromDataPre_GEODE_1_8_0_0,183
+toData,17
+toDataPre_GEODE_1_8_0_0,133
 
 org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation$GatewaySenderQueueEntrySynchronizationEntry,2
 fromData,20
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index ea2f603..71cdac2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -562,6 +562,20 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread
                 }
               }
             }
+
+            // filter out the events with CME
+            Iterator<GatewaySenderEventImpl> cmeItr = filteredList.iterator();
+            while (cmeItr.hasNext()) {
+              GatewaySenderEventImpl event = cmeItr.next();
+              if (event.isConcurrencyConflict()) {
+                cmeItr.remove();
+                logger.debug("The CME event: {} is removed from Gateway Sender queue: {}", event,
+                    sender);
+                statistics.incEventsNotQueued();
+                continue;
+              }
+            }
+
             /*
              * if (filteredList.isEmpty()) { eventQueueRemove(events.size()); continue; }
              */
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 7c74957..6ac2275 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -33,6 +33,7 @@ import org.apache.geode.internal.DataSerializableFixedID;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.VersionedDataInputStream;
+import org.apache.geode.internal.VersionedDataSerializable;
 import org.apache.geode.internal.cache.CachedDeserializable;
 import org.apache.geode.internal.cache.CachedDeserializableFactory;
 import org.apache.geode.internal.cache.Conflatable;
@@ -62,7 +63,8 @@ import org.apache.geode.internal.size.Sizeable;
  *
  */
 public class GatewaySenderEventImpl
-    implements AsyncEvent, DataSerializableFixedID, Conflatable, Sizeable, Releasable {
+    implements AsyncEvent, DataSerializableFixedID, Conflatable, Sizeable, Releasable,
+    VersionedDataSerializable {
   private static final long serialVersionUID = -5690172020872255422L;
 
   protected static final Object TOKEN_NULL = new Object();
@@ -171,6 +173,8 @@ public class GatewaySenderEventImpl
 
   protected boolean isInitialized;
 
+  private transient boolean isConcurrencyConflict = false;
+
   /**
    * Is this thread in the process of serializing this event?
    */
@@ -310,6 +314,7 @@ public class GatewaySenderEventImpl
     if (initialize) {
       initialize();
     }
+    this.isConcurrencyConflict = event.isConcurrencyConflict();
   }
 
   /**
@@ -671,7 +676,13 @@ public class GatewaySenderEventImpl
     return GATEWAY_SENDER_EVENT_IMPL;
   }
 
+  @Override
   public void toData(DataOutput out) throws IOException {
+    toDataPre_GEODE_1_8_0_0(out);
+    DataSerializer.writeBoolean(this.isConcurrencyConflict, out);
+  }
+
+  public void toDataPre_GEODE_1_8_0_0(DataOutput out) throws IOException {
     // Make sure we are initialized before we serialize.
     initialize();
     out.writeShort(VERSION);
@@ -695,7 +706,13 @@ public class GatewaySenderEventImpl
     DataSerializer.writeObject(this.key, out);
   }
 
+  @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    fromDataPre_GEODE_1_8_0_0(in);
+    this.isConcurrencyConflict = DataSerializer.readBoolean(in);
+  }
+
+  public void fromDataPre_GEODE_1_8_0_0(DataInput in) throws IOException, ClassNotFoundException {
     short version = in.readShort();
     if (version != VERSION) {
       // warning?`
@@ -742,7 +759,8 @@ public class GatewaySenderEventImpl
         .append(";creationTime=").append(this.creationTime).append(";shadowKey=")
         .append(this.shadowKey).append(";timeStamp=").append(this.versionTimeStamp)
         .append(";acked=").append(this.isAcked).append(";dispatched=").append(this.isDispatched)
-        .append(";bucketId=").append(this.bucketId).append("]");
+        .append(";bucketId=").append(this.bucketId).append(";isConcurrencyConflict=")
+        .append(this.isConcurrencyConflict).append("]");
     return buffer.toString();
   }
 
@@ -1125,6 +1143,14 @@ public class GatewaySenderEventImpl
     return bucketId;
   }
 
+  public boolean isConcurrencyConflict() {
+    return isConcurrencyConflict;
+  }
+
+  public boolean setConcurrencyConflict(boolean isConcurrencyConflict) {
+    return this.isConcurrencyConflict = isConcurrencyConflict;
+  }
+
   /**
    * @param tailKey the tailKey to set
    */
@@ -1141,7 +1167,7 @@ public class GatewaySenderEventImpl
 
   @Override
   public Version[] getSerializationVersions() {
-    return null;
+    return new Version[] {Version.GEODE_180};
   }
 
   public int getSerializedValueSize() {