You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2016/07/21 20:52:25 UTC

incubator-geode git commit: GEODE-1677: Events are now added to tmpQueuedEvents while shadow pr is created

Repository: incubator-geode
Updated Branches:
  refs/heads/develop 54f305be7 -> 9103a3db8


GEODE-1677: Events are now added to tmpQueuedEvents while shadow pr is created


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/9103a3db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/9103a3db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/9103a3db

Branch: refs/heads/develop
Commit: 9103a3db8642b9b9b4e00aa2d9a8ae3a0ddbc906
Parents: 54f305b
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Wed Jul 13 17:33:05 2016 -0700
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Thu Jul 21 13:47:54 2016 -0700

----------------------------------------------------------------------
 ...rentParallelGatewaySenderEventProcessor.java |  2 +-
 .../ConcurrentParallelGatewaySenderQueue.java   | 23 ++++++++++++++++----
 2 files changed, 20 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9103a3db/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index 04015f7..8b6a700 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -107,7 +107,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
     createProcessors(sender.getDispatcherThreads(), targetRs);
     
 //    this.queue = parallelQueue;
-    this.queue = new ConcurrentParallelGatewaySenderQueue(this.processors);
+    this.queue = new ConcurrentParallelGatewaySenderQueue(sender, this.processors);
     setDaemon(true);
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9103a3db/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
index f995ba4..ccdf42a 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderQueue.java
@@ -27,6 +27,7 @@ import com.gemstone.gemfire.internal.cache.DistributedRegion;
 import com.gemstone.gemfire.internal.cache.ForceReattemptException;
 import com.gemstone.gemfire.internal.cache.PartitionedRegion;
 import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
 import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
 import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
 import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
@@ -53,10 +54,13 @@ import com.gemstone.gemfire.internal.size.SingleObjectSizer;
  */
 public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
 
+  private final AbstractGatewaySender sender;
+
   private final ParallelGatewaySenderEventProcessor processors[];
   
-  public ConcurrentParallelGatewaySenderQueue(
+  public ConcurrentParallelGatewaySenderQueue(AbstractGatewaySender sender,
 		  ParallelGatewaySenderEventProcessor pro[]) {
+    this.sender = sender;
     this.processors = pro;
   }
   
@@ -168,9 +172,20 @@ public class ConcurrentParallelGatewaySenderQueue implements RegionQueue {
   }
   
   public void addShadowPartitionedRegionForUserPR(PartitionedRegion pr) {
-	for(int i =0; i< processors.length; i++){
-	  processors[i].addShadowPartitionedRegionForUserPR(pr);
-	 }
+    // Reset enqueuedAllTempQueueEvents if the sender is running
+    // This is done so that any events received while the shadow PR is added are queued in the tmpQueuedEvents
+    // instead of blocking the distribute call which could cause a deadlock. See GEM-801.
+    if (this.sender.isRunning()) {
+      this.sender.setEnqueuedAllTempQueueEvents(false);
+    }
+    this.sender.getLifeCycleLock().writeLock().lock();
+    try {
+      for (int i = 0; i < processors.length; i++) {
+        processors[i].addShadowPartitionedRegionForUserPR(pr);
+      }
+    } finally {
+      this.sender.getLifeCycleLock().writeLock().unlock();
+    }
   }
   
   private ParallelGatewaySenderEventProcessor getPGSProcessor(int bucketId) {