You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/04/08 01:44:05 UTC

geode git commit: GEM-1353

Repository: geode
Updated Branches:
  refs/heads/feature/GEM-1353 799548ee4 -> b9e8ac598


GEM-1353


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/b9e8ac59
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/b9e8ac59
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/b9e8ac59

Branch: refs/heads/feature/GEM-1353
Commit: b9e8ac59892aed7f2ddea62f54b07ddac688456e
Parents: 799548e
Author: zhouxh <gz...@pivotal.io>
Authored: Fri Apr 7 18:43:43 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Fri Apr 7 18:43:43 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/BucketRegion.java      | 272 ++++++++++++-------
 .../cache/DistributedCacheOperation.java        |  46 ++--
 .../geode/internal/cache/DistributedRegion.java |  49 +++-
 .../internal/cache/LocalRegionDataView.java     |  20 +-
 4 files changed, 259 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/b9e8ac59/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index d92ddab..03aa3ef 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -577,21 +577,33 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   }
 
   protected void distributeUpdateOperation(EntryEventImpl event, long lastModified) {
-    if (!event.isOriginRemote() && !event.isNetSearch() && getBucketAdvisor().isPrimary()) {
-      if (event.isBulkOpInProgress()) {
-        // consolidate the UpdateOperation for each entry into a PutAllMessage
-        // since we did not call basicPutPart3(), so we have to explicitly addEntry here
-        event.getPutAllOperation().addEntry(event, this.getId());
-      } else {
-        new UpdateOperation(event, lastModified).distribute();
-        if (logger.isDebugEnabled()) {
-          logger.debug("sent update operation : for region  : {}: with event: {}", this.getName(),
-              event);
+    long viewVersion = -1;
+    UpdateOperation op = null;
+
+    try {
+      if (!event.isOriginRemote() && !event.isNetSearch() && getBucketAdvisor().isPrimary()) {
+        if (event.isBulkOpInProgress()) {
+          // consolidate the UpdateOperation for each entry into a PutAllMessage
+          // since we did not call basicPutPart3(), so we have to explicitly addEntry here
+          event.getPutAllOperation().addEntry(event, this.getId());
+        } else {
+          // BR's put
+          op = new UpdateOperation(event, lastModified);
+          viewVersion = op.startOperation();
+          op.distribute();
+          if (logger.isDebugEnabled()) {
+            logger.debug("sent update operation : for region  : {}: with event: {}", this.getName(),
+                event);
+          }
         }
       }
-    }
-    if (!event.getOperation().isPutAll()) { // putAll will invoke listeners later
-      event.invokeCallbacks(this, true, true);
+      if (!event.getOperation().isPutAll()) { // putAll will invoke listeners later
+        event.invokeCallbacks(this, true, true);
+      }
+    } finally {
+      if (op != null) {
+        op.endOperation(viewVersion);
+      }
     }
   }
 
@@ -607,40 +619,55 @@ public class BucketRegion extends DistributedRegion implements Bucket {
     // timestamp returned from basicPutPart2, but as a bucket we want to do
     // distribution *before* we do basicPutPart2.
     final long modifiedTime = event.getEventTime(lastModified);
-    // Update the get stats if necessary.
-    if (this.partitionedRegion.getDataStore().hasClientInterest(event)) {
-      updateStatsForGet(entry, true);
-    }
-    if (!event.isOriginRemote()) {
-      if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
-        boolean eventHasDelta = event.getDeltaBytes() != null;
-        VersionTag v = entry.generateVersionTag(null, eventHasDelta, this, event);
-        if (v != null) {
-          if (logger.isDebugEnabled()) {
-            logger.debug("generated version tag {} in region {}", v, this.getName());
+
+    long viewVersion = -1;
+    UpdateOperation op = null;
+
+    try {
+      // Update the get stats if necessary.
+      if (this.partitionedRegion.getDataStore().hasClientInterest(event)) {
+        updateStatsForGet(entry, true);
+      }
+      if (!event.isOriginRemote()) {
+        if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
+          boolean eventHasDelta = event.getDeltaBytes() != null;
+          VersionTag v = entry.generateVersionTag(null, eventHasDelta, this, event);
+          if (v != null) {
+            if (logger.isDebugEnabled()) {
+              logger.debug("generated version tag {} in region {}", v, this.getName());
+            }
           }
         }
-      }
 
-      // This code assumes it is safe ignore token mode (GII in progress)
-      // because it assumes when the origin of the event is local,
-      // the GII has completed and the region is initialized and open for local
-      // ops
-      if (!event.isBulkOpInProgress()) {
-        long start = this.partitionedRegion.getPrStats().startSendReplication();
-        try {
-          UpdateOperation op = new UpdateOperation(event, modifiedTime);
-          op.distribute();
-        } finally {
-          this.partitionedRegion.getPrStats().endSendReplication(start);
+        // This code assumes it is safe ignore token mode (GII in progress)
+        // because it assumes when the origin of the event is local,
+        // the GII has completed and the region is initialized and open for local
+        // ops
+
+        if (!event.isBulkOpInProgress()) {
+          long start = this.partitionedRegion.getPrStats().startSendReplication();
+          try {
+            // PR's put PR
+            op = new UpdateOperation(event, modifiedTime);
+            viewVersion = op.startOperation();
+            op.distribute();
+          } finally {
+            this.partitionedRegion.getPrStats().endSendReplication(start);
+          }
+        } else {
+          // consolidate the UpdateOperation for each entry into a PutAllMessage
+          // basicPutPart3 takes care of this
         }
-      } else {
-        // consolidate the UpdateOperation for each entry into a PutAllMessage
-        // basicPutPart3 takes care of this
       }
-    }
 
-    return super.basicPutPart2(event, entry, isInitialized, lastModified, clearConflict);
+      long lastModifiedTime =
+          super.basicPutPart2(event, entry, isInitialized, lastModified, clearConflict);
+      return lastModifiedTime;
+    } finally {
+      if (op != null) {
+        op.endOperation(viewVersion);
+      }
+    }
   }
 
   protected void notifyGatewaySender(EnumListenerEvent operation, EntryEventImpl event) {
@@ -883,39 +910,61 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   }
 
   protected void distributeInvalidateOperation(EntryEventImpl event) {
-    if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
-      // This cache has processed the event, forward operation
-      // and event messages to backup buckets
-      new InvalidateOperation(event).distribute();
+    InvalidateOperation op = null;
+    long viewVersion = -1;
+    try {
+      if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
+        // This cache has processed the event, forward operation
+        // and event messages to backup buckets
+        // BR.invalidate hasSeenEvent
+        op = new InvalidateOperation(event);
+        viewVersion = op.startOperation();
+        op.distribute();
+      }
+      event.invokeCallbacks(this, true, false);
+    } finally {
+      if (op != null) {
+        op.endOperation(viewVersion);
+      }
     }
-    event.invokeCallbacks(this, true, false);
   }
 
   @Override
   void basicInvalidatePart2(final RegionEntry re, final EntryEventImpl event, boolean clearConflict,
       boolean invokeCallbacks) {
     // Assumed this is called with the entry synchronized
-    if (!event.isOriginRemote()) {
-      if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
-        VersionTag v = re.generateVersionTag(null, false, this, event);
-        if (logger.isDebugEnabled() && v != null) {
-          logger.debug("generated version tag {} in region {}", v, this.getName());
-        }
-        event.setVersionTag(v);
-      }
+    long viewVersion = -1;
+    InvalidateOperation op = null;
 
-      // This code assumes it is safe ignore token mode (GII in progress)
-      // because it assumes when the origin of the event is local,
-      // the GII has completed and the region is initialized and open for local
-      // ops
+    try {
+      if (!event.isOriginRemote()) {
+        if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
+          VersionTag v = re.generateVersionTag(null, false, this, event);
+          if (logger.isDebugEnabled() && v != null) {
+            logger.debug("generated version tag {} in region {}", v, this.getName());
+          }
+          event.setVersionTag(v);
+        }
 
-      // This code assumes that this bucket is primary
-      // distribute op to bucket secondaries and event to other listeners
-      InvalidateOperation op = new InvalidateOperation(event);
-      op.distribute();
+        // This code assumes it is safe ignore token mode (GII in progress)
+        // because it assumes when the origin of the event is local,
+        // the GII has completed and the region is initialized and open for local
+        // ops
+
+        // This code assumes that this bucket is primary
+        // distribute op to bucket secondaries and event to other listeners
+        // BR's invalidate
+        op = new InvalidateOperation(event);
+        viewVersion = op.startOperation();
+        op.distribute();
+      }
+      super.basicInvalidatePart2(re, event, clearConflict /* Clear conflict occurred */,
+          invokeCallbacks);
+    } finally {
+      if (op != null) {
+        op.endOperation(viewVersion);
+      }
     }
-    super.basicInvalidatePart2(re, event, clearConflict /* Clear conflict occurred */,
-        invokeCallbacks);
   }
 
   @Override
@@ -1131,49 +1180,72 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   }
 
   protected void distributeDestroyOperation(EntryEventImpl event) {
-    if (logger.isTraceEnabled(LogMarker.DM)) {
-      logger.trace(LogMarker.DM, "BR.basicDestroy: this cache has already seen this event {}",
-          event);
-    }
-    if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
-      if (event.isBulkOpInProgress()) {
-        // consolidate the DestroyOperation for each entry into a RemoveAllMessage
-        event.getRemoveAllOperation().addEntry(event, this.getId());
-      } else {
-        // This cache has processed the event, forward operation
-        // and event messages to backup buckets
-        event.setOldValueFromRegion();
-        new DestroyOperation(event).distribute();
+    long viewVersion = -1;
+    DestroyOperation op = null;
+
+    try {
+      if (logger.isTraceEnabled(LogMarker.DM)) {
+        logger.trace(LogMarker.DM, "BR.basicDestroy: this cache has already seen this event {}",
+            event);
+      }
+      if (!event.isOriginRemote() && getBucketAdvisor().isPrimary()) {
+        if (event.isBulkOpInProgress()) {
+          // consolidate the DestroyOperation for each entry into a RemoveAllMessage
+          event.getRemoveAllOperation().addEntry(event, this.getId());
+        } else {
+          // This cache has processed the event, forward operation
+          // and event messages to backup buckets
+          // BR's destroy, not to trigger callback here
+          event.setOldValueFromRegion();
+          op = new DestroyOperation(event);
+          viewVersion = op.startOperation();
+          op.distribute();
+        }
       }
-    }
 
-    if (!event.getOperation().isRemoveAll()) { // removeAll will invoke listeners later
-      event.invokeCallbacks(this, true, false);
+      if (!event.getOperation().isRemoveAll()) { // removeAll will invoke listeners later
+        event.invokeCallbacks(this, true, false);
+      }
+    } finally {
+      if (op != null) {
+        op.endOperation(viewVersion);
+      }
     }
   }
 
   @Override
   protected void basicDestroyBeforeRemoval(RegionEntry entry, EntryEventImpl event) {
-    // Assumed this is called with entry synchrony
-    if (!event.isOriginRemote() && !event.isBulkOpInProgress() && !event.getOperation().isLocal()
-        && !Operation.EVICT_DESTROY.equals(event.getOperation())
-        && !(event.isExpiration() && isEntryEvictDestroyEnabled())) {
-
-      if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
-        VersionTag v = entry.generateVersionTag(null, false, this, event);
-        if (logger.isDebugEnabled() && v != null) {
-          logger.debug("generated version tag {} in region {}", v, this.getName());
+    long viewVersion = -1;
+    DestroyOperation op = null;
+    try {
+      // Assumed this is called with entry synchrony
+      if (!event.isOriginRemote() && !event.isBulkOpInProgress() && !event.getOperation().isLocal()
+          && !Operation.EVICT_DESTROY.equals(event.getOperation())
+          && !(event.isExpiration() && isEntryEvictDestroyEnabled())) {
+
+        if (event.getVersionTag() == null || event.getVersionTag().isGatewayTag()) {
+          VersionTag v = entry.generateVersionTag(null, false, this, event);
+          if (logger.isDebugEnabled() && v != null) {
+            logger.debug("generated version tag {} in region {}", v, this.getName());
+          }
         }
-      }
 
-      // This code assumes it is safe ignore token mode (GII in progress)
-      // because it assume when the origin of the event is local,
-      // then GII has completed (the region has been completely initialized)
+        // This code assumes it is safe ignore token mode (GII in progress)
+        // because it assume when the origin of the event is local,
+        // then GII has completed (the region has been completely initialized)
 
-      // This code assumes that this bucket is primary
-      new DestroyOperation(event).distribute();
+        // This code assumes that this bucket is primary
+        // BR.destroy for retain
+        op = new DestroyOperation(event);
+        viewVersion = op.startOperation();
+        op.distribute();
+      }
+      super.basicDestroyBeforeRemoval(entry, event);
+    } finally {
+      if (op != null) {
+        op.endOperation(viewVersion);
+      }
     }
-    super.basicDestroyBeforeRemoval(entry, event);
   }
 
   @Override
@@ -1261,7 +1333,13 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   }
 
   protected void distributeUpdateEntryVersionOperation(EntryEventImpl event) {
-    new UpdateEntryVersionOperation(event).distribute();
+    UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event);
+    long viewVersion = op.startOperation();
+    try {
+      op.distribute();
+    } finally {
+      op.endOperation(viewVersion);
+    }
   }
 
   public int getRedundancyLevel() {

http://git-wip-us.apache.org/repos/asf/geode/blob/b9e8ac59/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index bded899..a050a2b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -240,6 +240,31 @@ public abstract class DistributedCacheOperation {
     return true;
   }
 
+  public long startOperation() {
+    DistributedRegion region = getRegion();
+    long viewVersion = -1;
+    if (this.containsRegionContentChange()) {
+      viewVersion = region.getDistributionAdvisor().startOperation();
+    }
+    if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP)) {
+      logger.trace(LogMarker.STATE_FLUSH_OP, "dispatching operation in view version {}",
+          viewVersion);
+    }
+    return viewVersion;
+  }
+
+  public void endOperation(long viewVersion) {
+    DistributedRegion region = getRegion();
+    if (viewVersion > 0) {
+      region.getDistributionAdvisor().endOperation(viewVersion);
+      viewVersion = -1;
+      if (logger.isDebugEnabled()) {
+        logger.trace(LogMarker.STATE_FLUSH_OP, "done dispatching operation in view version {}",
+            viewVersion);
+      }
+    }
+  }
+
   /**
    * Distribute a cache operation to other members of the distributed system. This method determines
    * who the recipients are and handles careful delivery of the operation to those members.
@@ -261,15 +286,6 @@ public abstract class DistributedCacheOperation {
     boolean isPutAll = (this instanceof DistributedPutAllOperation);
     boolean isRemoveAll = (this instanceof DistributedRemoveAllOperation);
 
-    long viewVersion = -1;
-    if (this.containsRegionContentChange()) {
-      viewVersion = region.getDistributionAdvisor().startOperation();
-    }
-    if (logger.isTraceEnabled(LogMarker.STATE_FLUSH_OP)) {
-      logger.trace(LogMarker.STATE_FLUSH_OP, "dispatching operation in view version {}",
-          viewVersion);
-    }
-
     try {
       // Recipients with CacheOp
       Set<InternalDistributedMember> recipients = getRecipients();
@@ -596,11 +612,6 @@ public abstract class DistributedCacheOperation {
           }
         }
 
-        if (viewVersion > 0) {
-          region.getDistributionAdvisor().endOperation(viewVersion);
-          viewVersion = -1;
-        }
-
         /** compute local client routing before waiting for an ack only for a bucket */
         if (region.isUsedForPartitionedRegionBucket()) {
           FilterInfo filterInfo = getLocalFilterRouting(filterRouting);
@@ -639,13 +650,6 @@ public abstract class DistributedCacheOperation {
       throw e;
     } finally {
       ReplyProcessor21.setShortSevereAlertProcessing(false);
-      if (viewVersion != -1) {
-        if (logger.isDebugEnabled()) {
-          logger.trace(LogMarker.STATE_FLUSH_OP, "done dispatching operation in view version {}",
-              viewVersion);
-        }
-        region.getDistributionAdvisor().endOperation(viewVersion);
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/b9e8ac59/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index fa02574..bd9cdd9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -437,12 +437,18 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
         distribute = false;
       }
       if (distribute) {
+        // DR's put, it has notified gateway sender earlier
         UpdateOperation op = new UpdateOperation(event, lastModified);
+        long viewVersion = op.startOperation();
         if (logger.isTraceEnabled()) {
           logger.trace("distributing operation for event : {} : for region : {}", event,
               this.getName());
         }
-        op.distribute();
+        try {
+          op.distribute();
+        } finally {
+          op.endOperation(viewVersion);
+        }
       }
     }
   }
@@ -1657,6 +1663,7 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
           event.getRemoveAllOperation().addEntry(event, true);
         }
         if (!getConcurrencyChecksEnabled() || event.hasValidVersionTag()) {
+          // DR.destroy, hasSeenEvent. no to notifyGateway
           distributeDestroy(event, expectedOldValue);
           event.invokeCallbacks(this, true, false);
         }
@@ -1676,8 +1683,14 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
     if (event.isDistributed() && !event.isOriginRemote() && !event.isBulkOpInProgress()) {
       boolean distribute = !event.getInhibitDistribution();
       if (distribute) {
+        // DR.destroy, it has notifiedGatewaySender ealier
         DestroyOperation op = new DestroyOperation(event);
-        op.distribute();
+        long viewVersion = op.startOperation();
+        try {
+          op.distribute();
+        } finally {
+          op.endOperation(viewVersion);
+        }
       }
     }
   }
@@ -1732,7 +1745,13 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
    * @since GemFire 5.7
    */
   protected void distributeInvalidateRegion(RegionEventImpl event) {
-    new InvalidateRegionOperation(event).distribute();
+    InvalidateRegionOperation op = new InvalidateRegionOperation(event);
+    long viewVersion = op.startOperation();
+    try {
+      op.distribute();
+    } finally {
+      op.endOperation(viewVersion);
+    }
   }
 
   /**
@@ -1781,7 +1800,13 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
     if (persistenceAdvisor != null) {
       persistenceAdvisor.releaseTieLock();
     }
-    new DestroyRegionOperation(event, notifyOfRegionDeparture).distribute();
+    DestroyRegionOperation op = new DestroyRegionOperation(event, notifyOfRegionDeparture);
+    long viewVersion = op.startOperation();
+    try {
+      op.distribute();
+    } finally {
+      op.endOperation(viewVersion);
+    }
   }
 
   /**
@@ -1858,8 +1883,14 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
       if (event.isDistributed() && !event.isOriginRemote()) {
         boolean distribute = !event.getInhibitDistribution();
         if (distribute) {
+          // DR.invalidate, it has triggered callback earlier
           InvalidateOperation op = new InvalidateOperation(event);
-          op.distribute();
+          long viewVersion = op.startOperation();
+          try {
+            op.distribute();
+          } finally {
+            op.endOperation(viewVersion);
+          }
         }
       }
     }
@@ -1890,8 +1921,14 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
     if (!this.regionInvalid && event.isDistributed() && !event.isOriginRemote()
         && !isTX() /* only distribute if non-tx */) {
       if (event.isDistributed() && !event.isOriginRemote()) {
+        // DR has sent callback earlier
         UpdateEntryVersionOperation op = new UpdateEntryVersionOperation(event);
-        op.distribute();
+        long viewVersion = op.startOperation();
+        try {
+          op.distribute();
+        } finally {
+          op.endOperation(viewVersion);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/b9e8ac59/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
index 119dd43..6361aa6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java
@@ -311,8 +311,14 @@ public class LocalRegionDataView implements InternalDataView {
       successfulPuts.clear();
       putallOp.fillVersionedObjectList(successfulPuts);
     }
-    region.postPutAllSend(putallOp, successfulPuts);
-    region.postPutAllFireEvents(putallOp, successfulPuts);
+    // BR & DR's putAll
+    long viewVersion = putallOp.startOperation();
+    try {
+      region.postPutAllSend(putallOp, successfulPuts);
+      region.postPutAllFireEvents(putallOp, successfulPuts);
+    } finally {
+      putallOp.endOperation(viewVersion);
+    }
   }
 
   @Override
@@ -325,7 +331,13 @@ public class LocalRegionDataView implements InternalDataView {
       successfulOps.clear();
       op.fillVersionedObjectList(successfulOps);
     }
-    region.postRemoveAllSend(op, successfulOps);
-    region.postRemoveAllFireEvents(op, successfulOps);
+    // BR, DR's removeAll
+    long viewVersion = op.startOperation();
+    try {
+      region.postRemoveAllSend(op, successfulOps);
+      region.postRemoveAllFireEvents(op, successfulOps);
+    } finally {
+      op.endOperation(viewVersion);
+    }
   }
 }