You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/11/17 17:56:43 UTC

[geode] branch feature/GEM-883 updated (13e02d1 -> 3018533)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a change to branch feature/GEM-883
in repository https://gitbox.apache.org/repos/asf/geode.git.


    omit 13e02d1  GEODE-3967: change to use logger
    omit 0ef3579  GEODE-3967: isConcurrencyConflict is set for 3 notifyTimestampsToGateways
    omit 744c74d  GEODE-3967: add more trace
    omit 42aec6a  GEODE-3967: add a case for invalidate
    omit 2a7e453  GEODE-3967: if put hits concurrent modification exception should still notify serial gateway sender
     new 3018533  GEODE-3967: if put hits concurrent modification exception should still notify serial gateway sender

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (13e02d1)
            \
             N -- N -- N   refs/heads/feature/GEM-883 (3018533)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].

[geode] 01/01: GEODE-3967: if put hits concurrent modification exception should still notify serial gateway sender

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEM-883
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 30185338b67b3954eac1c2ca46021a86efb71b89
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Thu Nov 9 23:49:29 2017 -0800

    GEODE-3967: if put hits concurrent modification exception should still notify serial gateway sender
---
 .../apache/geode/internal/cache/AbstractRegionMap.java   |  3 +++
 .../apache/geode/internal/cache/DestroyOperation.java    |  3 ---
 .../geode/internal/cache/DistributedCacheOperation.java  | 15 ++++++++++++++-
 .../org/apache/geode/internal/cache/LocalRegion.java     | 16 +++++++++++-----
 .../apache/geode/internal/cache/LocalRegionDataView.java |  9 +++++++++
 .../cache/wan/AbstractGatewaySenderEventProcessor.java   | 15 ++++++++++-----
 .../geode/internal/cache/wan/GatewaySenderEventImpl.java |  3 +++
 .../wan/serial/SerialGatewaySenderEventProcessor.java    |  8 +++++++-
 8 files changed, 57 insertions(+), 15 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index da0cf59..ee0a4aa 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -1188,6 +1188,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                               true/* conflict with clear */, duringRI, true);
                           doPart3 = true;
                         } catch (ConcurrentCacheModificationException ccme) {
+                          event.isConcurrencyConflict(true);
                           VersionTag tag = event.getVersionTag();
                           if (tag != null && tag.isTimeStampUpdated()) {
                             // Notify gateways of new time-stamp.
@@ -2097,6 +2098,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                   }
                 } // !opCompleted
               } catch (ConcurrentCacheModificationException ccme) {
+                event.isConcurrencyConflict(true);
                 VersionTag tag = event.getVersionTag();
                 if (tag != null && tag.isTimeStampUpdated()) {
                   // Notify gateways of new time-stamp.
@@ -2854,6 +2856,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                     clearOccured = true;
                     owner.recordEvent(event);
                   } catch (ConcurrentCacheModificationException ccme) {
+                    event.isConcurrencyConflict(true);
                     VersionTag tag = event.getVersionTag();
                     if (tag != null && tag.isTimeStampUpdated()) {
                       // Notify gateways of new time-stamp.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
index 20cbd28..4d376c6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
@@ -95,9 +95,6 @@ public class DestroyOperation extends DistributedCacheOperation {
 
       } catch (EntryNotFoundException e) {
         dispatchElidedEvent(rgn, ev);
-        if (!ev.isConcurrencyConflict()) {
-          rgn.notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, ev);
-        }
         throw e;
       } catch (CacheWriterException e) {
         throw new Error(
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 505c618..5ab3a4a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -1290,11 +1290,24 @@ public abstract class DistributedCacheOperation {
      */
     protected void dispatchElidedEvent(LocalRegion rgn, EntryEventImpl ev) {
       if (logger.isDebugEnabled()) {
-        logger.debug("dispatching elided event: {}", ev);
+        logger.debug("GGG:dispatching elided event: {}", ev, new Exception());
       }
       ev.isConcurrencyConflict(true);
       rgn.generateLocalFilterRouting(ev);
       rgn.notifyBridgeClients(ev);
+      rgn.notifyGatewaySender(getOperation(ev), ev);
+    }
+
+    private EnumListenerEvent getOperation(EntryEventImpl ev) {
+      if (ev.getOperation().isInvalidate()) {
+        return EnumListenerEvent.AFTER_INVALIDATE;
+      } else if (ev.getOperation().isDestroy()) {
+        return EnumListenerEvent.AFTER_DESTROY;
+      } else if (ev.getOperation().isUpdate()) {
+        return EnumListenerEvent.AFTER_UPDATE;
+      } else {
+        return EnumListenerEvent.AFTER_CREATE;
+      }
     }
 
     protected abstract InternalCacheEvent createEvent(DistributedRegion rgn)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index bed336a..158ff68 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -2851,6 +2851,8 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
               logger.debug("caught concurrent modification attempt when applying {}", event);
             }
             notifyBridgeClients(event);
+            notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE
+                : EnumListenerEvent.AFTER_CREATE, event);
           }
           if (!getDataView().isDeferredStats()) {
             getCachePerfStats().endPut(startPut, event.isOriginRemote());
@@ -5624,6 +5626,9 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
         logger.debug("caught concurrent modification attempt when applying {}", event);
       }
       notifyBridgeClients(event);
+      notifyGatewaySender(event.getOperation().isUpdate() ? EnumListenerEvent.AFTER_UPDATE
+          : EnumListenerEvent.AFTER_CREATE, event);
+
       return false;
     }
 
@@ -6111,8 +6116,7 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
   }
 
   protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
-    if (isPdxTypesRegion() || event.isConcurrencyConflict()) {
-      // isConcurrencyConflict is usually a concurrent cache modification problem
+    if (isPdxTypesRegion()) {
       return;
     }
 
@@ -6136,9 +6140,10 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
     if (allRemoteDSIds != null) {
       for (GatewaySender sender : getCache().getAllGatewaySenders()) {
         if (allGatewaySenderIds.contains(sender.getId())) {
-          // TODO: This is a BUG. Why return and not continue?
-          if (!this.getDataPolicy().withStorage() && sender.isParallel()) {
-            return;
+          // if isConcurrencyConflict is true, only notify serial gateway sender
+          if ((!this.getDataPolicy().withStorage() || event.isConcurrencyConflict())
+              && sender.isParallel()) {
+            continue;
           }
           if (logger.isDebugEnabled()) {
             logger.debug("Notifying the GatewaySender : {}", sender.getId());
@@ -6497,6 +6502,7 @@ public class LocalRegion extends AbstractRegion implements InternalRegion, Loade
       if (event.getVersionTag() != null && !event.getVersionTag().isGatewayTag()) {
         notifyBridgeClients(event);
       }
+      notifyGatewaySender(EnumListenerEvent.AFTER_DESTROY, event);
       return true; // event was elided
 
     } catch (DiskAccessException dae) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
index eed6176..b68859e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
@@ -25,6 +25,7 @@ import org.apache.geode.internal.cache.entries.AbstractRegionEntry;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
 import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
+import org.apache.geode.internal.logging.LogService;
 
 /**
  *
@@ -71,6 +72,10 @@ public class LocalRegionDataView implements InternalDataView {
     } catch (ConcurrentCacheModificationException e) {
       // a newer event has already been applied to the cache. this can happen
       // in a client cache if another thread is operating on the same key
+      event.isConcurrencyConflict(true);
+      LocalRegion lr = event.getLocalRegion();
+      LogService.getLogger().info("GGG:invalidateExistingEntry:" + event, new Exception());
+      // lr.notifyGatewaySender(EnumListenerEvent.AFTER_INVALIDATE, event);
     }
   }
 
@@ -81,6 +86,10 @@ public class LocalRegionDataView implements InternalDataView {
     } catch (ConcurrentCacheModificationException e) {
       // a later in time event has already been applied to the cache. this can happen
       // in a cache if another thread is operating on the same key
+      event.isConcurrencyConflict(true);
+      LocalRegion lr = event.getLocalRegion();
+      LogService.getLogger().info("GGG:updateEntryVersion:" + event, new Exception());
+      // lr.notifyGatewaySender(EnumListenerEvent.TIMESTAMP_UPDATE, event);
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 7a2cee1..a557875 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -518,16 +518,21 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
               // version is < 7.0.1, especially to prevent another loop over events.
               if (!sendUpdateVersionEvents
                   && event.getOperation() == Operation.UPDATE_VERSION_STAMP) {
-                if (isTraceEnabled) {
-                  logger.trace(
-                      "Update Event Version event: {} removed from Gateway Sender queue: {}", event,
-                      sender);
-                }
+                logger.debug("Update Event Version event: {} removed from Gateway Sender queue: {}",
+                    event, sender);
 
                 itr.remove();
                 statistics.incEventsNotQueued();
                 continue;
               }
+              if (((GatewaySenderEventImpl) event).isConcurrencyConflict) {
+                if (isDebugEnabled) {
+                  logger.debug("primary should ignore the concurrency conflict event:" + event);
+                }
+                itr.remove();
+                statistics.incEventsNotQueued();
+                continue;
+              }
 
               boolean transmit = filter.beforeTransmit(event);
               if (!transmit) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index 4d201b2..d28dc5b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -175,6 +175,8 @@ public class GatewaySenderEventImpl
 
   protected boolean isInitialized;
 
+  public boolean isConcurrencyConflict = false;
+
   /**
    * Is this thread in the process of serializing this event?
    */
@@ -316,6 +318,7 @@ public class GatewaySenderEventImpl
     if (initialize) {
       initialize();
     }
+    this.isConcurrencyConflict = event.isConcurrencyConflict();
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
index 734b560..7ecb233 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderEventProcessor.java
@@ -379,6 +379,7 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
       if (m != null) {
         for (EventWrapper ew : m.values()) {
           GatewaySenderEventImpl gatewayEvent = ew.event;
+          logger.info("GGG:releaseUnprocessedEvents:" + gatewayEvent);
           gatewayEvent.release();
         }
         this.unprocessedEvents = null;
@@ -711,8 +712,13 @@ public class SerialGatewaySenderEventProcessor extends AbstractGatewaySenderEven
       // @todo add an assertion that !getPrimary()
       // now we can safely use the unprocessedEvents field
       Long v = this.unprocessedTokens.remove(gatewayEvent.getEventId());
+      if (v != null && gatewayEvent.isConcurrencyConflict) {
+        logger.info("GGG:secondary after removed:" + v + ":" + gatewayEvent);
+      }
 
-      if (v == null) {
+      if (v == null && !gatewayEvent.isConcurrencyConflict) {
+        // only when isConcurrencyConflict is false, add the event into unprocessedEvents
+        logger.info("GGG:secondary before add to:" + gatewayEvent, new Exception());
         // first time for the event
         if (logger.isTraceEnabled()) {
           logger.trace("{}: fromSecondary event {}:{}->{} added from unprocessed events map",

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.