You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/01/25 08:14:50 UTC

[geode] branch feature/GEODE-3967 updated (15c3600 -> 291b1c1)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a change to branch feature/GEODE-3967
in repository https://gitbox.apache.org/repos/asf/geode.git.


    omit 15c3600  GEODE-3967: when ConcurrentCacheModificationException happened. GatewaySenderEventImpl should save the status and notify gatewaysender anyway. SerialGatewaySender will handle it. In AbstractUpdateOperation's doPutOrCreate's 3 tries of basicUpdate, the 3rd one should allow both create and update.             make event with CME to be ignored by dispatcher
     add 4d46ba5  GEODE-4079 Identify hash indexes as deprecated in docs for gfsh create index command
     add 1f06a68  GEODE-4365: do not call GemFireCacheImpl.getExisting (#1331)
     add 1c95035  GEODE-4355: use Awaitility to fix ControlFileWatchdogIntegrationTest flakiness (#1329)
     add ee9b51b  GEODE-4358: Added check for null updater
     add f4b7f6d  GEODE-4374: Mark ShutdownCommandOverHttpDUnitTest as Flaky
     new 291b1c1  GEODE-3967: when ConcurrentCacheModificationException happened. GatewaySenderEventImpl should save the status and notify gatewaysender anyway. SerialGatewaySender will handle it. In AbstractUpdateOperation's doPutOrCreate's 3 tries of basicUpdate, the 3rd one should allow both create and update.             make event with CME to be ignored by dispatcher

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (15c3600)
            \
             N -- N -- N   refs/heads/feature/GEODE-3967 (291b1c1)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../cache/client/internal/QueueConnectionImpl.java |  4 +-
 .../cache/client/internal/QueueManagerImpl.java    |  6 ++-
 .../geode/internal/cache/TXCommitMessage.java      |  2 +-
 .../wan/AbstractGatewaySenderEventProcessor.java   | 59 +++++++++++-----------
 .../internal/cache/wan/GatewaySenderEventImpl.java |  3 +-
 ...Test.java => QueueConnectionImplJUnitTest.java} | 33 +++++++-----
 .../ControlFileWatchdogIntegrationTest.java        | 15 +++---
 .../gfsh/command-pages/create.html.md.erb          |  2 +-
 .../commands/ShutdownCommandOverHttpDUnitTest.java |  3 +-
 9 files changed, 69 insertions(+), 58 deletions(-)
 copy geode-core/src/test/java/org/apache/geode/cache/client/internal/{AbstractOpTest.java => QueueConnectionImplJUnitTest.java} (50%)

-- 
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.

[geode] 01/01: GEODE-3967: when ConcurrentCacheModificationException happened. GatewaySenderEventImpl should save the status and notify gatewaysender anyway. SerialGatewaySender will handle it. In AbstractUpdateOperation's doPutOrCreate's 3 tries of basicUpdate, the 3rd one should allow both create and update. make event with CME to be ignored by dispatcher

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-3967
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 291b1c12154e7f58bd7a3a63e6981b0c405ebd80
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Sat Jan 20 17:39:00 2018 -0800

    GEODE-3967: when ConcurrentCacheModificationException happened. GatewaySenderEventImpl should save the status and notify gatewaysender anyway. SerialGatewaySender will handle it. In AbstractUpdateOperation's doPutOrCreate's 3 tries of basicUpdate, the 3rd one should allow both create and update.
                make event with CME to be ignored by dispatcher
---
 .../internal/cache/AbstractUpdateOperation.java    |  2 +-
 .../apache/geode/internal/cache/LocalRegion.java   |  6 ++-
 .../wan/AbstractGatewaySenderEventProcessor.java   | 51 +++++++++++++---------
 .../internal/cache/wan/GatewaySenderEventImpl.java | 34 ++++++++++++---
 .../serial/SerialGatewaySenderEventProcessor.java  |  9 ++--
 .../cache30/DistributedAckRegionCCEDUnitTest.java  |  4 +-
 .../codeAnalysis/sanctionedDataSerializables.txt   |  8 ++--
 7 files changed, 79 insertions(+), 35 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
index 585e131..a706abd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
@@ -175,7 +175,7 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
                   || (rgn.getDataPolicy().withReplication() && rgn.getConcurrencyChecksEnabled())) {
                 overwriteDestroyed = true;
                 ev.makeCreate();
-                rgn.basicUpdate(ev, true /* ifNew */, false/* ifOld */, lastMod,
+                rgn.basicUpdate(ev, false /* ifNew */, false/* ifOld */, lastMod,
                     overwriteDestroyed);
                 rgn.getCachePerfStats().endPut(startPut, ev.isOriginRemote());
                 updated = true;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index aca96d0..f5822ab 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -5618,6 +5618,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
         logger.debug("caught concurrent modification attempt when applying {}", event);
       }
       notifyBridgeClients(event);
+      notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE
+          : EnumListenerEvent.AFTER_CREATE, event);
       return false;
     }
 
@@ -6100,8 +6102,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
   }
 
   protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
-    if (isPdxTypesRegion() || event.isConcurrencyConflict()) {
-      // isConcurrencyConflict is usually a concurrent cache modification problem
+    if (isPdxTypesRegion()) {
       return;
     }
 
@@ -6485,6 +6486,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
       // Notify clients only if its NOT a gateway event.
       if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) {
         notifyBridgeClients(event);
+        notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event);
       }
       return true; // event was elided
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 7a2cee1..7d09335 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -508,27 +508,38 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
           }
 
           // Filter the events
-          for (GatewayEventFilter filter : sender.getGatewayEventFilters()) {
-            Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
-            while (itr.hasNext()) {
-              GatewayQueueEvent event = itr.next();
-
-              // This seems right place to prevent transmission of UPDATE_VERSION events if
-              // receiver's
-              // version is < 7.0.1, especially to prevent another loop over events.
-              if (!sendUpdateVersionEvents
-                  && event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
-                if (isTraceEnabled) {
-                  logger.trace(
-                      "Update Event Version event: {} removed from Gateway Sender queue: {}", event,
-                      sender);
-                }
+          Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
+          while (itr.hasNext()) {
+            GatewayQueueEvent event = itr.next();
+
+            // This seems right place to prevent transmission of UPDATE_VERSION events if
+            // receiver's
+            // version is < 7.0.1, especially to prevent another loop over events.
+            if (!sendUpdateVersionEvents
+                && event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
+              if (isDebugEnabled) {
+                logger.debug("Update Event Version event: {} removed from Gateway Sender queue: {}",
+                    event, sender);
+              }
 
-                itr.remove();
-                statistics.incEventsNotQueued();
-                continue;
+              itr.remove();
+              statistics.incEventsNotQueued();
+              continue;
+            }
+
+            if (((GatewaySenderEventImpl) event).isConcurrencyConflict()) {
+              if (isDebugEnabled) {
+                logger.debug(
+                    "Event with concurrent modification conflict: {} will be removed from Gateway Sender queue: {}",
+                    event, sender);
               }
 
+              itr.remove();
+              statistics.incEventsNotQueued();
+              continue;
+            }
+
+            for (GatewayEventFilter filter : sender.getGatewayEventFilters()) {
               boolean transmit = filter.beforeTransmit(event);
               if (!transmit) {
                 if (isDebugEnabled) {
@@ -549,8 +560,8 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
           // AsyncEventQueue since possibleDuplicate flag is not used in WAN.
           if (this.getSender().isParallel()
               && (this.getDispatcher() instanceof GatewaySenderEventCallbackDispatcher)) {
-            Iterator<GatewaySenderEventImpl> itr = filteredList.iterator();
-            while (itr.hasNext()) {
+            Iterator<GatewaySenderEventImpl> eventItr = filteredList.iterator();
+            while (eventItr.hasNext()) {
               GatewaySenderEventImpl event = (GatewaySenderEventImpl) itr.next();
               PartitionedRegion qpr = null;
               if (this.getQueue() instanceof ConcurrentParallelGatewaySenderQueue) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 2748c7d..184c2c6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -33,6 +33,7 @@ import org.apache.geode.internal.DataSerializableFixedID;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.VersionedDataInputStream;
+import org.apache.geode.internal.VersionedDataSerializable;
 import org.apache.geode.internal.cache.CachedDeserializable;
 import org.apache.geode.internal.cache.CachedDeserializableFactory;
 import org.apache.geode.internal.cache.Conflatable;
@@ -61,8 +62,8 @@ import org.apache.geode.internal.size.Sizeable;
  * @since GemFire 7.0
  *
  */
-public class GatewaySenderEventImpl
-    implements AsyncEvent, DataSerializableFixedID, Conflatable, Sizeable, Releasable {
+public class GatewaySenderEventImpl implements AsyncEvent, DataSerializableFixedID, Conflatable,
+    Sizeable, Releasable, VersionedDataSerializable {
   private static final long serialVersionUID = -5690172020872255422L;
 
   protected static final Object TOKEN_NULL = new Object();
@@ -171,6 +172,8 @@ public class GatewaySenderEventImpl
 
   protected boolean isInitialized;
 
+  private transient boolean isConcurrencyConflict = false;
+
   /**
    * Is this thread in the process of serializing this event?
    */
@@ -312,6 +315,7 @@ public class GatewaySenderEventImpl
     if (initialize) {
       initialize();
     }
+    this.isConcurrencyConflict = event.isConcurrencyConflict();
   }
 
   /**
@@ -673,7 +677,13 @@ public class GatewaySenderEventImpl
     return GATEWAY_SENDER_EVENT_IMPL;
   }
 
+  @Override
   public void toData(DataOutput out) throws IOException {
+    toDataPre_GEODE_1_4_0_0(out);
+    DataSerializer.writeBoolean(this.isConcurrencyConflict, out);
+  }
+
+  public void toDataPre_GEODE_1_4_0_0(DataOutput out) throws IOException {
     // Make sure we are initialized before we serialize.
     initialize();
     out.writeShort(VERSION);
@@ -697,7 +707,13 @@ public class GatewaySenderEventImpl
     DataSerializer.writeObject(this.key, out);
   }
 
+  @Override
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    fromDataPre_GEODE_1_4_0_0(in);
+    this.isConcurrencyConflict = DataSerializer.readBoolean(in);
+  }
+
+  public void fromDataPre_GEODE_1_4_0_0(DataInput in) throws IOException, ClassNotFoundException {
     short version = in.readShort();
     if (version != VERSION) {
       // warning?`
@@ -744,7 +760,8 @@ public class GatewaySenderEventImpl
         .append(";creationTime=").append(this.creationTime).append(";shadowKey= ")
         .append(this.shadowKey).append(";timeStamp=").append(this.versionTimeStamp)
         .append(";acked=").append(this.isAcked).append(";dispatched=").append(this.isDispatched)
-        .append(";bucketId=").append(this.bucketId).append("]");
+        .append(";bucketId=").append(this.bucketId).append(";isConcurrencyConflict=")
+        .append(this.isConcurrencyConflict).append("]");
     return buffer.toString();
   }
 
@@ -1128,6 +1145,14 @@ public class GatewaySenderEventImpl
     return bucketId;
   }
 
+  public boolean isConcurrencyConflict() {
+    return isConcurrencyConflict;
+  }
+
+  public boolean setConcurrencyConflict(boolean isConcurrencyConflict) {
+    return this.isConcurrencyConflict = isConcurrencyConflict;
+  }
+
   /**
    * @param tailKey the tailKey to set
    */
@@ -1144,8 +1169,7 @@ public class GatewaySenderEventImpl
 
   @Override
   public Version[] getSerializationVersions() {
-    // TODO Auto-generated method stub
-    return null;
+    return new Version[] {Version.GEODE_140};
   }
 
   public int getSerializedValueSize() {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index 734b560..d5d0baa 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -35,6 +35,7 @@ import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.CacheListener;
 import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.wan.GatewayQueueEvent;
@@ -423,9 +424,11 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
         } else {
           // If it is not, create an uninitialized GatewayEventImpl and
           // put it into the map of unprocessed events.
-          senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue, false); // OFFHEAP
-                                                                                              // ok
-          handleSecondaryEvent(senderEvent);
+          if (!event.getOperation().equals(Operation.UPDATE_VERSION_STAMP)) {
+            senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue, false); // OFFHEAP
+                                                                                                // ok
+            handleSecondaryEvent(senderEvent);
+          }
         }
       }
     }
diff --git a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
index 1016a4b..bf6a611 100644
--- a/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java
@@ -476,7 +476,9 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT
       CCRegion.basicBridgePut("cckey0", "newvalue", null, true, null, id, true, holder);
       vm0.invoke(new SerializableRunnable("check conflation count") {
         public void run() {
-          assertEquals("expected one conflated event", 1,
+          // after changed the 3rd try of AUO.doPutOrCreate to be ifOld=false ifNew=false
+          // ARM.updateEntry will be called one more time, so there will be 2 conflacted events
+          assertEquals("expected two conflated event", 2,
               CCRegion.getCachePerfStats().getConflatedEventsCount());
         }
       });
diff --git a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 1c5a36d..1f84823 100644
--- a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1988,9 +1988,11 @@ org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackArgument,2
 fromData,63,2a2bb700182a2bb80019b60017b500032abb00075905b7000ab500062bb9001a01003d033e1d1ca200172ab400062bb9001a0100b6001b57840301a7ffeab1
 toData,87,2a2bb700112ab40003b800122bb800132ab40006c6003b2b2ab40006b60014b9001502002ab40006b600164d2cb9000c010099001a2cb9000d0100c0000e4e2b2db60017b900150200a7ffe3a7000a2b03b900150200b1
 
-org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,2
-fromData,183,2bb9007201003d1c10119f00032a04b5002b2a2bb900730100b500282a2bb900730100b500291c1011a200232bc1007499001c2bb80075b20076a60012bb0077592bc00074b20078b700794c2a2bb8007ac0007bb5002a2a2bb8007cb500102a2bb9007d0100b5002e2a2bb6007e2a2bb8007fb500302a2bb8007ac00020b500212a2bb900800100b500132a2bb900810100b500172a2bb900730100b500092a2bb900810100b80004b500052a2bb900810100b5001bb1
-toData,133,2ab600272b1011b9006702002b2ab40028b9006802002b2ab40029b9006802002ab4002a2bb800692ab400102bb8006a2b2ab4002eb9006b02002a2bb6006c2ab6002f2bb8006d2ab400212bb800692b2ab40013b9006e02002b2ab40017b9006f03002b2ab40009b9006802002b2ab40005b60070b9006f03002b2ab60071b9006f0300b1
+org/apache/geode/internal/cache/wan/GatewaySenderEventImpl,4
+fromData,17,2a2bb600772a2bb80078b60079b50006b1
+fromDataPre_GEODE_1_4_0_0,183,2bb9007a01003d1c10119f00032a04b5002d2a2bb9007b0100b5002a2a2bb9007b0100b5002b1c1011a200232bc1007c99001c2bb8007db2007ea60012bb007f592bc0007cb20080b700814c2a2bb80082c00083b5002c2a2bb80084b500112a2bb900850100b500302a2bb600862a2bb80087b500322a2bb80082c00021b500222a2bb900880100b500142a2bb900890100b500182a2bb9007b0100b5000a2a2bb900890100b80004b500052a2bb900890100b5001cb1
+toData,17,2a2bb600692ab40006b8006a2bb8006bb1
+toDataPre_GEODE_1_4_0_0,133,2ab600282b1011b9006c02002b2ab4002ab9006d02002b2ab4002bb9006d02002ab4002c2bb8006e2ab400112bb8006f2b2ab40030b9007002002a2bb600712ab600312bb800722ab400222bb8006e2b2ab40014b9007302002b2ab40018b9007403002b2ab4000ab9006d02002b2ab40005b60075b9007403002b2ab60076b900740300b1
 
 org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation$GatewaySenderQueueEntrySynchronizationEntry,2
 fromData,20,2a2bb80006b500022a2bb80006c00007b50001b1

-- 
To stop receiving notification emails like this one, please contact
zhouxj@apache.org.