You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2018/01/21 01:45:15 UTC

[geode] 01/01: GEODE-3967: when ConcurrentCacheModificationException happened. GatewaySenderEventImpl should save the status and notify gatewaysender anyway. SerialGatewaySender will handle it. In AbstractUpdateOperation's doPutOrCreate's 3 tries of basicUpdate, the 3rd one should allow both create and update.

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-3967
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 68054778c8030a88281cada6a693e72b935c9144
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Sat Jan 20 17:39:00 2018 -0800

    GEODE-3967: when ConcurrentCacheModificationException happened. GatewaySenderEventImpl should save the status and notify gatewaysender anyway. SerialGatewaySender will handle it. In AbstractUpdateOperation's doPutOrCreate's 3 tries of basicUpdate, the 3rd one should allow both create and update.
---
 .../apache/geode/internal/cache/AbstractRegionMap.java |  3 +++
 .../geode/internal/cache/AbstractUpdateOperation.java  |  2 +-
 .../org/apache/geode/internal/cache/LocalRegion.java   | 18 +++++++++++++-----
 .../internal/cache/wan/GatewaySenderEventImpl.java     |  6 +++++-
 .../wan/serial/SerialGatewaySenderEventProcessor.java  |  9 ++++++---
 5 files changed, 28 insertions(+), 10 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 487c35a..1fe9c00 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -1185,6 +1185,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                               true/* conflict with clear */, duringRI, true);
                           doPart3 = true;
                         } catch (ConcurrentCacheModificationException ccme) {
+                          event.isConcurrencyConflict(true);
                           VersionTag tag = event.getVersionTag();
                           if (tag != null && tag.isTimeStampUpdated()) {
                             // Notify gateways of new time-stamp.
@@ -2096,6 +2097,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                   }
                 } // !opCompleted
               } catch (ConcurrentCacheModificationException ccme) {
+                event.isConcurrencyConflict(true);
                 VersionTag tag = event.getVersionTag();
                 if (tag != null && tag.isTimeStampUpdated()) {
                   // Notify gateways of new time-stamp.
@@ -2854,6 +2856,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                     clearOccured = true;
                     owner.recordEvent(event);
                   } catch (ConcurrentCacheModificationException ccme) {
+                    event.isConcurrencyConflict(true);
                     VersionTag tag = event.getVersionTag();
                     if (tag != null && tag.isTimeStampUpdated()) {
                       // Notify gateways of new time-stamp.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
index 585e131..a706abd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
@@ -175,7 +175,7 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
                   || (rgn.getDataPolicy().withReplication() && rgn.getConcurrencyChecksEnabled())) {
                 overwriteDestroyed = true;
                 ev.makeCreate();
-                rgn.basicUpdate(ev, true /* ifNew */, false/* ifOld */, lastMod,
+                rgn.basicUpdate(ev, false /* ifNew */, false/* ifOld */, lastMod,
                     overwriteDestroyed);
                 rgn.getCachePerfStats().endPut(startPut, ev.isOriginRemote());
                 updated = true;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index aca96d0..4a9ff44 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -2840,6 +2840,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
               logger.debug("caught concurrent modification attempt when applying {}", event);
             }
             notifyBridgeClients(event);
+            notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE
+                : EnumListenerEvent.AFTER_CREATE, event);
           }
           if (!getDataView().isDeferredStats()) {
             getCachePerfStats().endPut(startPut, event.isOriginRemote());
@@ -5618,6 +5620,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
         logger.debug("caught concurrent modification attempt when applying {}", event);
       }
       notifyBridgeClients(event);
+      notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE
+          : EnumListenerEvent.AFTER_CREATE, event);
       return false;
     }
 
@@ -5844,6 +5848,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     updateTimeStampEvent.setGenerateCallbacks(false);
     updateTimeStampEvent.distributedMember = event.getDistributedMember();
     updateTimeStampEvent.setNewEventId(getSystem());
+    if (event.isConcurrencyConflict()) {
+      updateTimeStampEvent.isConcurrencyConflict(true);
+    }
 
     if (event.getRegion() instanceof BucketRegion) {
       BucketRegion bucketRegion = (BucketRegion) event.getRegion();
@@ -6100,8 +6107,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
   }
 
   protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
-    if (isPdxTypesRegion() || event.isConcurrencyConflict()) {
-      // isConcurrencyConflict is usually a concurrent cache modification problem
+    if (isPdxTypesRegion()) {
       return;
     }
 
@@ -6125,9 +6131,10 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     if (allRemoteDSIds != null) {
       for (GatewaySender sender : getCache().getAllGatewaySenders()) {
         if (allGatewaySenderIds.contains(sender.getId())) {
-          // TODO: This is a BUG. Why return and not continue?
-          if (!this.getDataPolicy().withStorage() && sender.isParallel()) {
-            return;
+          // if isConcurrencyConflict is true, only notify serial gateway sender
+          if ((!this.getDataPolicy().withStorage() || event.isConcurrencyConflict())
+              && sender.isParallel()) {
+            continue;
           }
           if (logger.isDebugEnabled()) {
             logger.debug("Notifying the GatewaySender : {}", sender.getId());
@@ -6485,6 +6492,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
       // Notify clients only if its NOT a gateway event.
       if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) {
         notifyBridgeClients(event);
+        notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event);
       }
       return true; // event was elided
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 2748c7d..e744afc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -171,6 +171,8 @@ public class GatewaySenderEventImpl
 
   protected boolean isInitialized;
 
+  public boolean isConcurrencyConflict = false;
+
   /**
    * Is this thread in the process of serializing this event?
    */
@@ -312,6 +314,7 @@ public class GatewaySenderEventImpl
     if (initialize) {
       initialize();
     }
+    this.isConcurrencyConflict = event.isConcurrencyConflict();
   }
 
   /**
@@ -744,7 +747,8 @@ public class GatewaySenderEventImpl
         .append(";creationTime=").append(this.creationTime).append(";shadowKey= ")
         .append(this.shadowKey).append(";timeStamp=").append(this.versionTimeStamp)
         .append(";acked=").append(this.isAcked).append(";dispatched=").append(this.isDispatched)
-        .append(";bucketId=").append(this.bucketId).append("]");
+        .append(";bucketId=").append(this.bucketId).append(";isConcurrencyConflict=")
+        .append(this.isConcurrencyConflict).append("]");
     return buffer.toString();
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index 734b560..d5d0baa 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -35,6 +35,7 @@ import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.CacheListener;
 import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.wan.GatewayQueueEvent;
@@ -423,9 +424,11 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
         } else {
           // If it is not, create an uninitialized GatewayEventImpl and
           // put it into the map of unprocessed events.
-          senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue, false); // OFFHEAP
-                                                                                              // ok
-          handleSecondaryEvent(senderEvent);
+          if (!event.getOperation().equals(Operation.UPDATE_VERSION_STAMP)) {
+            senderEvent = new GatewaySenderEventImpl(operation, event, substituteValue, false); // OFFHEAP
+                                                                                                // ok
+            handleSecondaryEvent(senderEvent);
+          }
         }
       }
     }

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.