You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/04/21 16:58:35 UTC

geode git commit: GEODE-2806: if the batch is dispatched, even the bucket is no longer primary, the batch should still be deleted as planned.

Repository: geode
Updated Branches:
  refs/heads/develop 50686b0b4 -> 0862174c3


GEODE-2806: if the batch is dispatched, even the bucket is no longer primary, the batch should still be deleted as planned.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/0862174c
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/0862174c
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/0862174c

Branch: refs/heads/develop
Commit: 0862174c30cad1536f2c105b783653bd0d4344e8
Parents: 50686b0
Author: zhouxh <gz...@pivotal.io>
Authored: Fri Apr 21 09:57:05 2017 -0700
Committer: zhouxh <gz...@pivotal.io>
Committed: Fri Apr 21 09:58:19 2017 -0700

----------------------------------------------------------------------
 .../parallel/ParallelGatewaySenderQueue.java    | 58 ++++++++++----------
 1 file changed, 28 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/0862174c/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index cf4c5a9..9696b90 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -1104,38 +1104,36 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
   private void destroyEventFromQueue(PartitionedRegion prQ, int bucketId, Object key) {
     boolean isPrimary = prQ.getRegionAdvisor().getBucketAdvisor(bucketId).isPrimary();
-    if (isPrimary) {
-      BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId);
-      // TODO : Kishor : Make sure we dont need to initalize a bucket
-      // before destroying a key from it
-      try {
-        if (brq != null) {
-          brq.destroyKey(key);
-        }
-        stats.decQueueSize();
-      } catch (EntryNotFoundException e) {
-        if (!this.sender.isBatchConflationEnabled() && logger.isDebugEnabled()) {
-          logger.debug(
-              "ParallelGatewaySenderQueue#remove: Got EntryNotFoundException while removing key {} for {} for bucket = {} for GatewaySender {}",
-              key, this, bucketId, this.sender);
-        }
-      } catch (ForceReattemptException e) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Bucket :{} moved to other member", bucketId);
-        }
-      } catch (PrimaryBucketException e) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Primary bucket :{} moved to other member", bucketId);
-        }
-      } catch (RegionDestroyedException e) {
-        if (logger.isDebugEnabled()) {
-          logger.debug(
-              "Caught RegionDestroyedException attempting to remove key {} from bucket {} in {}",
-              key, bucketId, prQ.getFullPath());
-        }
+    BucketRegionQueue brq = getBucketRegionQueueByBucketId(prQ, bucketId);
+    // TODO : Kishor : Make sure we dont need to initalize a bucket
+    // before destroying a key from it
+    try {
+      if (brq != null) {
+        brq.destroyKey(key);
+      }
+      stats.decQueueSize();
+    } catch (EntryNotFoundException e) {
+      if (!this.sender.isBatchConflationEnabled() && logger.isDebugEnabled()) {
+        logger.debug(
+            "ParallelGatewaySenderQueue#remove: Got EntryNotFoundException while removing key {} for {} for bucket = {} for GatewaySender {}",
+            key, this, bucketId, this.sender);
+      }
+    } catch (ForceReattemptException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Bucket :{} moved to other member", bucketId);
+      }
+    } catch (PrimaryBucketException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Primary bucket :{} moved to other member", bucketId);
+      }
+    } catch (RegionDestroyedException e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Caught RegionDestroyedException attempting to remove key {} from bucket {} in {}", key,
+            bucketId, prQ.getFullPath());
       }
-      addRemovedEvent(prQ, bucketId, key);
     }
+    addRemovedEvent(prQ, bucketId, key);
   }
 
   public void resetLastPeeked() {