You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/11/14 00:20:18 UTC

[geode] branch feature/GEM-883 updated (1aeafab -> 335cce9)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a change to branch feature/GEM-883
in repository https://gitbox.apache.org/repos/asf/geode.git.


    omit 1aeafab  GEODE-3967: if put hits concurrent modification exception should still notify serial gateway sender
     new 335cce9  GEODE-3967: if put hits concurrent modification exception should still notify serial gateway sender

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (1aeafab)
            \
             N -- N -- N   refs/heads/feature/GEM-883 (335cce9)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../internal/cache/wan/AbstractGatewaySenderEventProcessor.java   | 8 ++++++++
 1 file changed, 8 insertions(+)

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].

[geode] 01/01: GEODE-3967: if put hits concurrent modification exception should still notify serial gateway sender

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEM-883
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 335cce92548f18f6a376947f8df2b8af22d8da51
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Thu Nov 9 23:49:29 2017 -0800

    GEODE-3967: if put hits concurrent modification exception should still notify serial gateway sender
---
 .../org/apache/geode/internal/cache/LocalRegion.java     | 16 +++++++++++-----
 .../cache/wan/AbstractGatewaySenderEventProcessor.java   |  8 ++++++++
 .../geode/internal/cache/wan/GatewaySenderEventImpl.java |  3 +++
 .../wan/serial/SerialGatewaySenderEventProcessor.java    |  2 +-
 4 files changed, 23 insertions(+), 6 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index bed336a..158ff68 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -2851,6 +2851,8 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
               logger.debug("caught concurrent modification attempt when applying {}", event);
             }
             notifyBridgeClients(event);
+            notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE
+                : EnumListenerEvent.AFTER_CREATE, event);
           }
           if (!getDataView().isDeferredStats()) {
             getCachePerfStats().endPut(startPut, event.isOriginRemote());
@@ -5624,6 +5626,9 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
         logger.debug("caught concurrent modification attempt when applying {}", event);
       }
       notifyBridgeClients(event);
+      notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE
+          : EnumListenerEvent.AFTER_CREATE, event);
+
       return false;
     }
 
@@ -6111,8 +6116,7 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
   }
 
   protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
-    if (isPdxTypesRegion() || event.isConcurrencyConflict()) {
-      // isConcurrencyConflict is usually a concurrent cache modification problem
+    if (isPdxTypesRegion()) {
       return;
     }
 
@@ -6136,9 +6140,10 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
     if (allRemoteDSIds != null) {
       for (GatewaySender sender : getCache().getAllGatewaySenders()) {
         if (allGatewaySenderIds.contains(sender.getId())) {
-          // TODO: This is a BUG. Why return and not continue?
-          if (!this.getDataPolicy().withStorage() && sender.isParallel()) {
-            return;
+          // if isConcurrencyConflict is true, only notify serial gateway sender
+          if ((!this.getDataPolicy().withStorage() || event.isConcurrencyConflict())
+              && sender.isParallel()) {
+            continue;
           }
           if (logger.isDebugEnabled()) {
             logger.debug("Notifying the GatewaySender : {}", sender.getId());
@@ -6497,6 +6502,7 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
       if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) {
         notifyBridgeClients(event);
       }
+      notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event);
       return true; // event was elided
 
     } catch (DiskAccessException dae) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 7a2cee1..f94c21d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -528,6 +528,14 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
                 statistics.incEventsNotQueued();
                 continue;
               }
+              if (((GatewaySenderEventImpl) event).isConcurrencyConflict) {
+                if (isDebugEnabled) {
+                  logger.debug("primary should ignore the concurrency conflict event:" + event);
+                }
+                itr.remove();
+                statistics.incEventsNotQueued();
+                continue;
+              }
 
               boolean transmit = filter.beforeTransmit(event);
               if (!transmit) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 4d201b2..d28dc5b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -175,6 +175,8 @@ public class GatewaySenderEventImpl
 
   protected boolean isInitialized;
 
+  public boolean isConcurrencyConflict = false;
+
   /**
    * Is this thread in the process of serializing this event?
    */
@@ -316,6 +318,7 @@ public class GatewaySenderEventImpl
     if (initialize) {
       initialize();
     }
+    this.isConcurrencyConflict = event.isConcurrencyConflict();
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index 734b560..e9c4d28 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -712,7 +712,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
       // now we can safely use the unprocessedEvents field
       Long v = this.unprocessedTokens.remove(gatewayEvent.getEventId());
 
-      if (v == null) {
+      if (v == null && !gatewayEvent.isConcurrencyConflict) {
         // first time for the event
         if (logger.isTraceEnabled()) {
           logger.trace("{}: fromSecondary event {}:{}->{} added from unprocessed events map",

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.