You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/04/14 22:34:55 UTC

geode git commit: GEODE-2787: When distribution happened, it calls startOperation() to increase a count, then call an endOperation() to decrease the count.

Repository: geode
Updated Branches:
  refs/heads/develop 5c5c94797 -> 7e607de71


GEODE-2787: When distribution happened, it calls startOperation() to increase a count, then call an endOperation() to decrease the count.

state flush will wait for this count to become 0.

But notifyGateway() is called after distribute(). So there's race that stateflush finished but notifyGateway has not done yet.

The fix is to move the endOperation() after callbacks.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/7e607de7
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/7e607de7
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/7e607de7

Branch: refs/heads/develop
Commit: 7e607de71163200ce3741664dd64d32f692acc4f
Parents: 5c5c947
Author: zhouxh <gz...@pivotal.io>
Authored: Fri Apr 7 18:43:43 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Fri Apr 14 15:31:25 2017 -0700

----------------------------------------------------------------------
 .../internal/cache/AbstractUpdateOperation.java |  19 --
 .../geode/internal/cache/BucketRegion.java      | 256 ++++++++++++-------
 .../cache/DistributedCacheOperation.java        |  86 +++++--
 .../geode/internal/cache/DistributedRegion.java |  18 +-
 .../geode/internal/cache/LocalRegion.java       |   6 +-
 .../internal/cache/LocalRegionDataView.java     |  24 +-
 .../geode/internal/cache/PartitionedRegion.java |   6 +-
 .../geode/internal/cache/Bug45934DUnitTest.java |   4 +-
 8 files changed, 270 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/7e607de7/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
index 55df601..26fdde5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
@@ -40,8 +40,6 @@ import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationExcep
 import org.apache.geode.internal.cache.versions.VersionTag;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.log4j.LogMarker;
-import org.apache.geode.internal.util.DelayedAction;
 
 /**
  * Common code for both UpdateOperation and DistributedPutAllOperation.
@@ -55,7 +53,6 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
       value = "UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD",
       justification = "test hook that is unset normally")
-  public static volatile DelayedAction test_InvalidVersionAction;
 
   private final long lastModifiedTime;
 
@@ -65,22 +62,6 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
   }
 
   @Override
-  public void distribute() {
-    try {
-      super.distribute();
-    } catch (InvalidVersionException e) {
-      if (logger.isDebugEnabled()) {
-        logger.trace(LogMarker.DM, "PutAll failed since versions were missing; retrying again", e);
-      }
-
-      if (test_InvalidVersionAction != null) {
-        test_InvalidVersionAction.run();
-      }
-      super.distribute();
-    }
-  }
-
-  @Override
   protected Set getRecipients() {
     CacheDistributionAdvisor advisor = getRegion().getCacheDistributionAdvisor();
     return advisor.adviseUpdate(getEvent());

http://git-wip-us.apache.org/repos/asf/geode/blob/7e607de7/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index d92ddab..136d7b9 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -577,21 +577,32 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   }
 
   protected void distributeUpdateOperation(EntryEventImpl event, long lastModified) {
-    if (!event.isOriginRemote() && !event.isNetSearch() && getBucketAdvisor().isPrimary()) {
-      if (event.isBulkOpInProgress()) {
-        // consolidate the UpdateOperation for each entry into a PutAllMessage
-        // since we did not call basicPutPart3(), so we have to explicitly addEntry here
-        event.getPutAllOperation().addEntry(event, this.getId());
-      } else {
-        new UpdateOperation(event, lastModified).distribute();
-        if (logger.isDebugEnabled()) {
-          logger.debug("sent update operation : for region  : {}: with event: {}", this.getName(),
-              event);
+    long token = -1;
+    UpdateOperation op = null;
+
+    try {
+      if (!event.isOriginRemote() && !event.isNetSearch() && getBucketAdvisor().isPrimary()) {
+        if (event.isBulkOpInProgress()) {
+          // consolidate the UpdateOperation for each entry into a PutAllMessage
+          // since we did not call basicPutPart3(), so we have to explicitly addEntry here
+          event.getPutAllOperation().addEntry(event, this.getId());
+        } else {
+          // before distribute: BR's put
+          op = new UpdateOperation(event, lastModified);
+          token = op.startOperation();
+          if (logger.isDebugEnabled()) {
+            logger.debug("sent update operation : for region  : {}: with event: {}", this.getName(),
+                event);
+          }
         }
       }
-    }
-    if (!event.getOperation().isPutAll()) { // putAll will invoke listeners later
-      event.invokeCallbacks(this, true, true);
+      if (!event.getOperation().isPutAll()) { // putAll will invoke listeners later
+        event.invokeCallbacks(this, true, true);
+      }
+    } finally {
+      if (op != null) {
+        op.endOperation(token);
+      }
     }
   }
 
@@ -607,40 +618,54 @@ public class BucketRegion extends DistributedRegion implements Bucket {
     // timestamp returned from basicPutPart2, but as a bucket we want to do
     // distribution *before* we do basicPutPart2.
     final long modifiedTime = event.getEventTime(lastModified);
-    // Update the get stats if necessary.
-    if (this.partitionedRegion.getDataStore().hasClientInterest(event)) {
-      updateStatsForGet(entry, true);
-    }
-    if (!event.isOriginRemote()) {
-      if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
-        boolean eventHasDelta = event.getDeltaBytes() != null;
-        VersionTag v = entry.generateVersionTag(null, eventHasDelta, this, event);
-        if (v != null) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("generated version tag {} in region {}", v, this.getName());
+
+    long token = -1;
+    UpdateOperation op = null;
+
+    try {
+      // Update the get stats if necessary.
+      if (this.partitionedRegion.getDataStore().hasClientInterest(event)) {
+        updateStatsForGet(entry, true);
+      }
+      if (!event.isOriginRemote()) {
+        if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
+          boolean eventHasDelta = event.getDeltaBytes() != null;
+          VersionTag v = entry.generateVersionTag(null, eventHasDelta, this, event);
+          if (v != null) {
+            if (logger.isDebugEnabled()) {
+              logger.debug("generated version tag {} in region {}", v, this.getName());
+            }
           }
         }
-      }
 
-      // This code assumes it is safe ignore token mode (GII in progress)
-      // because it assumes when the origin of the event is local,
-      // the GII has completed and the region is initialized and open for local
-      // ops
-      if (!event.isBulkOpInProgress()) {
-        long start = this.partitionedRegion.getPrStats().startSendReplication();
-        try {
-          UpdateOperation op = new UpdateOperation(event, modifiedTime);
-          op.distribute();
-        } finally {
-          this.partitionedRegion.getPrStats().endSendReplication(start);
+        // This code assumes it is safe ignore token mode (GII in progress)
+        // because it assumes when the origin of the event is local,
+        // the GII has completed and the region is initialized and open for local
+        // ops
+
+        if (!event.isBulkOpInProgress()) {
+          long start = this.partitionedRegion.getPrStats().startSendReplication();
+          try {
+            // before distribute: PR's put PR
+            op = new UpdateOperation(event, modifiedTime);
+            token = op.startOperation();
+          } finally {
+            this.partitionedRegion.getPrStats().endSendReplication(start);
+          }
+        } else {
+          // consolidate the UpdateOperation for each entry into a PutAllMessage
+          // basicPutPart3 takes care of this
         }
-      } else {
-        // consolidate the UpdateOperation for each entry into a PutAllMessage
-        // basicPutPart3 takes care of this
       }
-    }
 
-    return super.basicPutPart2(event, entry, isInitialized, lastModified, clearConflict);
+      long lastModifiedTime =
+          super.basicPutPart2(event, entry, isInitialized, lastModified, clearConflict);
+      return lastModifiedTime;
+    } finally {
+      if (op != null) {
+        op.endOperation(token);
+      }
+    }
   }
 
   protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
@@ -883,39 +908,59 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   }
 
   protected void distributeInvalidateOperation(EntryEventImpl event) {
-    if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
-      // This cache has processed the event, forward operation
-      // and event messages to backup buckets
-      new InvalidateOperation(event).distribute();
+    InvalidateOperation op = null;
+    long token = -1;
+    try {
+      if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
+        // This cache has processed the event, forward operation
+        // and event messages to backup buckets
+        // before distribute: BR.invalidate hasSeenEvent
+        op = new InvalidateOperation(event);
+        token = op.startOperation();
+      }
+      event.invokeCallbacks(this, true, false);
+    } finally {
+      if (op != null) {
+        op.endOperation(token);
+      }
     }
-    event.invokeCallbacks(this, true, false);
   }
 
   @Override
   void basicInvalidatePart2(final RegionEntry re, final EntryEventImpl event, boolean clearConflict,
       boolean invokeCallbacks) {
     // Assumed this is called with the entry synchronized
-    if (!event.isOriginRemote()) {
-      if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
-        VersionTag v = re.generateVersionTag(null, false, this, event);
-        if (logger.isDebugEnabled() && v != null) {
-          logger.debug("generated version tag {} in region {}", v, this.getName());
+    long token = -1;
+    InvalidateOperation op = null;
+
+    try {
+      if (!event.isOriginRemote()) {
+        if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
+          VersionTag v = re.generateVersionTag(null, false, this, event);
+          if (logger.isDebugEnabled() && v != null) {
+            logger.debug("generated version tag {} in region {}", v, this.getName());
+          }
+          event.setVersionTag(v);
         }
-        event.setVersionTag(v);
-      }
 
-      // This code assumes it is safe ignore token mode (GII in progress)
-      // because it assumes when the origin of the event is local,
-      // the GII has completed and the region is initialized and open for local
-      // ops
+        // This code assumes it is safe ignore token mode (GII in progress)
+        // because it assumes when the origin of the event is local,
+        // the GII has completed and the region is initialized and open for local
+        // ops
 
-      // This code assumes that this bucket is primary
-      // distribute op to bucket secondaries and event to other listeners
-      InvalidateOperation op = new InvalidateOperation(event);
-      op.distribute();
+        // This code assumes that this bucket is primary
+        // distribute op to bucket secondaries and event to other listeners
+        // before distribute: BR's invalidate
+        op = new InvalidateOperation(event);
+        token = op.startOperation();
+      }
+      super.basicInvalidatePart2(re, event, clearConflict /* Clear conflict occurred */,
+          invokeCallbacks);
+    } finally {
+      if (op != null) {
+        op.endOperation(token);
+      }
     }
-    super.basicInvalidatePart2(re, event, clearConflict /* Clear conflict occurred */,
-        invokeCallbacks);
   }
 
   @Override
@@ -1131,49 +1176,70 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   }
 
   protected void distributeDestroyOperation(EntryEventImpl event) {
-    if (logger.isTraceEnabled(LogMarker.DM)) {
-      logger.trace(LogMarker.DM, "BR.basicDestroy: this cache has already seen this event {}",
-          event);
-    }
-    if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
-      if (event.isBulkOpInProgress()) {
-        // consolidate the DestroyOperation for each entry into a RemoveAllMessage
-        event.getRemoveAllOperation().addEntry(event, this.getId());
-      } else {
-        // This cache has processed the event, forward operation
-        // and event messages to backup buckets
-        event.setOldValueFromRegion();
-        new DestroyOperation(event).distribute();
+    long token = -1;
+    DestroyOperation op = null;
+
+    try {
+      if (logger.isTraceEnabled(LogMarker.DM)) {
+        logger.trace(LogMarker.DM, "BR.basicDestroy: this cache has already seen this event {}",
+            event);
+      }
+      if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
+        if (event.isBulkOpInProgress()) {
+          // consolidate the DestroyOperation for each entry into a RemoveAllMessage
+          event.getRemoveAllOperation().addEntry(event, this.getId());
+        } else {
+          // This cache has processed the event, forward operation
+          // and event messages to backup buckets
+          // before distribute: BR's destroy, not to trigger callback here
+          event.setOldValueFromRegion();
+          op = new DestroyOperation(event);
+          token = op.startOperation();
+        }
       }
-    }
 
-    if (!event.getOperation().isRemoveAll()) { // removeAll will invoke listeners later
-      event.invokeCallbacks(this, true, false);
+      if (!event.getOperation().isRemoveAll()) { // removeAll will invoke listeners later
+        event.invokeCallbacks(this, true, false);
+      }
+    } finally {
+      if (op != null) {
+        op.endOperation(token);
+      }
     }
   }
 
   @Override
   protected void basicDestroyBeforeRemoval(RegionEntry entry, EntryEventImpl event) {
-    // Assumed this is called with entry synchrony
-    if (!event.isOriginRemote() && !event.isBulkOpInProgress() && !event.getOperation().isLocal()
-        && !Operation.EVICT_DESTROY.equals(event.getOperation())
-        && !(event.isExpiration() && isEntryEvictDestroyEnabled())) {
-
-      if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
-        VersionTag v = entry.generateVersionTag(null, false, this, event);
-        if (logger.isDebugEnabled() && v != null) {
-          logger.debug("generated version tag {} in region {}", v, this.getName());
+    long token = -1;
+    DestroyOperation op = null;
+    try {
+      // Assumed this is called with entry synchrony
+      if (!event.isOriginRemote() && !event.isBulkOpInProgress() && !event.getOperation().isLocal()
+          && !Operation.EVICT_DESTROY.equals(event.getOperation())
+          && !(event.isExpiration() && isEntryEvictDestroyEnabled())) {
+
+        if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
+          VersionTag v = entry.generateVersionTag(null, false, this, event);
+          if (logger.isDebugEnabled() && v != null) {
+            logger.debug("generated version tag {} in region {}", v, this.getName());
+          }
         }
-      }
 
-      // This code assumes it is safe ignore token mode (GII in progress)
-      // because it assume when the origin of the event is local,
-      // then GII has completed (the region has been completely initialized)
+        // This code assumes it is safe ignore token mode (GII in progress)
+        // because it assume when the origin of the event is local,
+        // then GII has completed (the region has been completely initialized)
 
-      // This code assumes that this bucket is primary
-      new DestroyOperation(event).distribute();
+        // This code assumes that this bucket is primary
+        // before distribute: BR.destroy for retain
+        op = new DestroyOperation(event);
+        token = op.startOperation();
+      }
+      super.basicDestroyBeforeRemoval(entry, event);
+    } finally {
+      if (op != null) {
+        op.endOperation(token);
+      }
     }
-    super.basicDestroyBeforeRemoval(entry, event);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/geode/blob/7e607de7/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index bded899..0a9ccd8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -33,6 +33,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.InvalidDeltaException;
+import org.apache.geode.InvalidVersionException;
 import org.apache.geode.SystemFailure;
 import org.apache.geode.cache.CacheEvent;
 import org.apache.geode.cache.CacheFactory;
@@ -78,6 +79,7 @@ import org.apache.geode.internal.offheap.StoredObject;
 import org.apache.geode.internal.offheap.annotations.Released;
 import org.apache.geode.internal.offheap.annotations.Unretained;
 import org.apache.geode.internal.sequencelog.EntryLogger;
+import org.apache.geode.internal.util.DelayedAction;
 
 /**
  * 
@@ -240,11 +242,74 @@ public abstract class DistributedCacheOperation {
     return true;
   }
 
+  public static volatile DelayedAction test_InvalidVersionAction;
+
+  /**
+   * region's distribution advisor marked that a distribution is about to start, then distribute. It
+   * returns a token, which is view version. Return -1 means the method did not succeed. This method
+   * must be invoked before toDistribute(). This method should pair with endOperation() in
+   * try/finally block.
+   */
+  public long startOperation() {
+    DistributedRegion region = getRegion();
+    long viewVersion = -1;
+    if (this.containsRegionContentChange()) {
+      viewVersion = region.getDistributionAdvisor().startOperation();
+    }
+    if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP)) {
+      logger.trace(LogMarker.STATE_FLUSH_OP, "dispatching operation in view version {}",
+          viewVersion);
+    }
+    try {
+      _distribute();
+    } catch (InvalidVersionException e) {
+      if (logger.isDebugEnabled()) {
+        logger.trace(LogMarker.DM, "PutAll failed since versions were missing; retrying again", e);
+      }
+
+      if (test_InvalidVersionAction != null) {
+        test_InvalidVersionAction.run();
+      }
+      _distribute();
+    }
+    return viewVersion;
+  }
+
+  /**
+   * region's distribution advisor marked that a distribution is ended. This method should pair with
+   * startOperation in try/finally block.
+   */
+  public void endOperation(long viewVersion) {
+    DistributedRegion region = getRegion();
+    if (viewVersion != -1) {
+      region.getDistributionAdvisor().endOperation(viewVersion);
+      if (logger.isDebugEnabled()) {
+        logger.trace(LogMarker.STATE_FLUSH_OP, "done dispatching operation in view version {}",
+            viewVersion);
+      }
+    }
+  }
+
   /**
    * Distribute a cache operation to other members of the distributed system. This method determines
    * who the recipients are and handles careful delivery of the operation to those members.
    */
   public void distribute() {
+    long token = -1;
+    try {
+      token = startOperation();
+    } finally {
+      endOperation(token);
+    }
+  }
+
+  /**
+   * About to distribute a cache operation to other members of the distributed system. This method
+   * determines who the recipients are and handles careful delivery of the operation to those
+   * members. This method should wrapped by startOperation() and endOperation() in try/finally
+   * block.
+   */
+  private void _distribute() {
     DistributedRegion region = getRegion();
     DM mgr = region.getDistributionManager();
     boolean reliableOp = isOperationReliable() && region.requiresReliabilityCheck();
@@ -261,15 +326,6 @@ public abstract class DistributedCacheOperation {
     boolean isPutAll = (this instanceof DistributedPutAllOperation);
     boolean isRemoveAll = (this instanceof DistributedRemoveAllOperation);
 
-    long viewVersion = -1;
-    if (this.containsRegionContentChange()) {
-      viewVersion = region.getDistributionAdvisor().startOperation();
-    }
-    if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP)) {
-      logger.trace(LogMarker.STATE_FLUSH_OP, "dispatching operation in view version {}",
-          viewVersion);
-    }
-
     try {
       // Recipients with CacheOp
       Set<InternalDistributedMember> recipients = getRecipients();
@@ -596,11 +652,6 @@ public abstract class DistributedCacheOperation {
           }
         }
 
-        if (viewVersion > 0) {
-          region.getDistributionAdvisor().endOperation(viewVersion);
-          viewVersion = -1;
-        }
-
         /** compute local client routing before waiting for an ack only for a bucket */
         if (region.isUsedForPartitionedRegionBucket()) {
           FilterInfo filterInfo = getLocalFilterRouting(filterRouting);
@@ -639,13 +690,6 @@ public abstract class DistributedCacheOperation {
       throw e;
     } finally {
       ReplyProcessor21.setShortSevereAlertProcessing(false);
-      if (viewVersion != -1) {
-        if (logger.isDebugEnabled()) {
-          logger.trace(LogMarker.STATE_FLUSH_OP, "done dispatching operation in view version {}",
-              viewVersion);
-        }
-        region.getDistributionAdvisor().endOperation(viewVersion);
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/7e607de7/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 8cdc517..926f36d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -437,6 +437,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
         distribute = false;
       }
       if (distribute) {
+        // before distribute: DR's put, it has notified gateway sender earlier
         UpdateOperation op = new UpdateOperation(event, lastModified);
         if (logger.isTraceEnabled()) {
           logger.trace("distributing operation for event : {} : for region : {}", event,
@@ -1657,6 +1658,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
           event.getRemoveAllOperation().addEntry(event, true);
         }
         if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+          // before distribute: DR.destroy, hasSeenEvent. no to notifyGateway
           distributeDestroy(event, expectedOldValue);
           event.invokeCallbacks(this, true, false);
         }
@@ -1676,6 +1678,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
     if (event.isDistributed() && !event.isOriginRemote() && !event.isBulkOpInProgress()) {
       boolean distribute = !event.getInhibitDistribution();
       if (distribute) {
+        // before distribute: DR.destroy, it has notifiedGatewaySender ealier
         DestroyOperation op = new DestroyOperation(event);
         op.distribute();
       }
@@ -1858,6 +1861,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
       if (event.isDistributed() && !event.isOriginRemote()) {
         boolean distribute = !event.getInhibitDistribution();
         if (distribute) {
+          // before distribute: DR.invalidate, it has triggered callback earlier
           InvalidateOperation op = new InvalidateOperation(event);
           op.distribute();
         }
@@ -1890,6 +1894,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
     if (!this.regionInvalid && event.isDistributed() && !event.isOriginRemote()
         && !isTX() /* only distribute if non-tx */) {
       if (event.isDistributed() && !event.isOriginRemote()) {
+        // before distribute: DR has sent callback earlier
         UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event);
         op.distribute();
       }
@@ -3339,28 +3344,33 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
   /**
    * Distribute the PutAllOp. This implementation distributes it to peers.
    * 
+   * @return token >0 means startOperation finished distribution
    * @since GemFire 5.7
    */
   @Override
-  public void postPutAllSend(DistributedPutAllOperation putAllOp,
+  public long postPutAllSend(DistributedPutAllOperation putAllOp,
       VersionedObjectList successfulPuts) {
+    long token = -1;
     if (putAllOp.putAllDataSize > 0) {
-      putAllOp.distribute();
+      token = putAllOp.startOperation();
     } else {
       if (logger.isDebugEnabled()) {
         logger.debug("DR.postPutAll: no data to distribute");
       }
     }
+    return token;
   }
 
   @Override
-  public void postRemoveAllSend(DistributedRemoveAllOperation op,
+  public long postRemoveAllSend(DistributedRemoveAllOperation op,
       VersionedObjectList successfulOps) {
+    long token = -1;
     if (op.removeAllDataSize > 0) {
-      op.distribute();
+      token = op.startOperation();
     } else {
       getCache().getLoggerI18n().fine("DR.postRemoveAll: no data to distribute");
     }
+    return token;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/geode/blob/7e607de7/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 297c90b..4b1786f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -10458,14 +10458,16 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     }
   }
 
-  public void postPutAllSend(DistributedPutAllOperation putallOp,
+  public long postPutAllSend(DistributedPutAllOperation putallOp,
       VersionedObjectList successfulPuts) {
     /* No-op for local region of course */
+    return -1;
   }
 
-  public void postRemoveAllSend(DistributedRemoveAllOperation op,
+  public long postRemoveAllSend(DistributedRemoveAllOperation op,
       VersionedObjectList successfulOps) {
     /* No-op for local region of course */
+    return -1;
   }
 
 

http://git-wip-us.apache.org/repos/asf/geode/blob/7e607de7/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
index 119dd43..b4aa20b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
@@ -311,8 +311,16 @@ public class LocalRegionDataView implements InternalDataView {
       successfulPuts.clear();
       putallOp.fillVersionedObjectList(successfulPuts);
     }
-    region.postPutAllSend(putallOp, successfulPuts);
-    region.postPutAllFireEvents(putallOp, successfulPuts);
+    // BR & DR's putAll
+    long token = -1;
+    try {
+      token = region.postPutAllSend(putallOp, successfulPuts);
+      region.postPutAllFireEvents(putallOp, successfulPuts);
+    } finally {
+      if (region instanceof DistributedRegion) {
+        putallOp.endOperation(token);
+      }
+    }
   }
 
   @Override
@@ -325,7 +333,15 @@ public class LocalRegionDataView implements InternalDataView {
       successfulOps.clear();
       op.fillVersionedObjectList(successfulOps);
     }
-    region.postRemoveAllSend(op, successfulOps);
-    region.postRemoveAllFireEvents(op, successfulOps);
+    // BR, DR's removeAll
+    long token = -1;
+    try {
+      token = region.postRemoveAllSend(op, successfulOps);
+      region.postRemoveAllFireEvents(op, successfulOps);
+    } finally {
+      if (region instanceof DistributedRegion) {
+        op.endOperation(token);
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/7e607de7/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 0f5e316..614de4d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -2077,7 +2077,7 @@ public class PartitionedRegion extends LocalRegion
    * @param successfulPuts not used in PartitionedRegion.
    */
   @Override
-  public void postPutAllSend(DistributedPutAllOperation putallO,
+  public long postPutAllSend(DistributedPutAllOperation putallO,
       VersionedObjectList successfulPuts) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
 
@@ -2174,10 +2174,11 @@ public class PartitionedRegion extends LocalRegion
        * != null) { e.release(); } } }
        */
     }
+    return -1;
   }
 
   @Override
-  public void postRemoveAllSend(DistributedRemoveAllOperation op,
+  public long postRemoveAllSend(DistributedRemoveAllOperation op,
       VersionedObjectList successfulOps) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
 
@@ -2264,6 +2265,7 @@ public class PartitionedRegion extends LocalRegion
         }
       }
     }
+    return -1;
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/geode/blob/7e607de7/geode-core/src/test/java/org/apache/geode/internal/cache/Bug45934DUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/Bug45934DUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/Bug45934DUnitTest.java
index 0bf501f..8021b97 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/Bug45934DUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/Bug45934DUnitTest.java
@@ -63,13 +63,13 @@ public class Bug45934DUnitTest extends JUnit4CacheTestCase {
     Region<Integer, Integer> region = rf.create(name);
 
     // 3. reset the error flag after initial failure
-    AbstractUpdateOperation.test_InvalidVersionAction = new DelayedAction(new Runnable() {
+    DistributedCacheOperation.test_InvalidVersionAction = new DelayedAction(new Runnable() {
       @Override
       public void run() {
         unsetRemoteFlag(remote);
       }
     });
-    AbstractUpdateOperation.test_InvalidVersionAction.allowToProceed();
+    DistributedCacheOperation.test_InvalidVersionAction.allowToProceed();
 
     // 3. put data
     Map<Integer, Integer> values = new HashMap<Integer, Integer>();