You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2017/08/24 01:24:42 UTC

geode git commit: GEODE-3276: Managing race conditions while the senders are stopped

Repository: geode
Updated Branches:
  refs/heads/develop a5469147d -> 0daf9549a


GEODE-3276: Managing race conditions while the senders are stopped

	* When a connection is initialized, a readAckThread may be alive from a previous incarnation.
	* This AckThread will be stuck on a read socket with no timeout as nothing was dispatched.
	* Also while it was stuck on the read, it will hold a connection lifecycle read lock
	* The initialize connection needs a connection life cycle write lock to start the connection but the read lock is held by the ack thread.
	* This results in a deadlock and eventually a hang.
	* Another situation is that we set the flag isStopped for the event processor before actually shutting down the diapatcher and ack thread.
	* So after the flag is set and before actually shutting down the dispatcher and ackThread, a gateway proxy stomper thread gets in between these two steps of execution.
	* The stomper thread checks the isStopped flag, which was set to true, and proceeds to destroy the connection pool. However the dispatcher and ackThread were still running.
	* This results in a out of heap memory exception while the ack thread is reading from the socket while connection pool was destroyed.
	* To solve this issue, the stomper thread checks if the event processor and dispatcher exists, if true then we close the input streams before destroying the connection pool.

	This closes #732


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/0daf9549
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/0daf9549
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/0daf9549

Branch: refs/heads/develop
Commit: 0daf9549a69a135ad1ab267be8e0749e00986dfa
Parents: a546914
Author: nabarun <nn...@pivotal.io>
Authored: Thu Aug 17 10:51:15 2017 -0700
Committer: nabarun <nn...@pivotal.io>
Committed: Wed Aug 23 18:18:29 2017 -0700

----------------------------------------------------------------------
 .../internal/ParallelAsyncEventQueueImpl.java           | 10 +---------
 .../asyncqueue/internal/SerialAsyncEventQueueImpl.java  |  7 +------
 .../geode/internal/cache/wan/AbstractGatewaySender.java | 12 ++++++++++++
 .../cache/wan/GatewaySenderEventCallbackDispatcher.java |  5 +++++
 .../cache/wan/GatewaySenderEventDispatcher.java         |  2 ++
 .../cache/wan/GatewaySenderEventRemoteDispatcher.java   |  9 +++++++++
 .../cache/wan/parallel/ParallelGatewaySenderImpl.java   |  6 +-----
 .../cache/wan/serial/SerialGatewaySenderImpl.java       |  7 +------
 8 files changed, 32 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/0daf9549/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
index 9fa15f7..538b65a 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
@@ -30,7 +30,6 @@ import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.cache.UpdateAttributesProcessor;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
-import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
 import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
 import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
 import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
@@ -109,14 +108,7 @@ public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender {
       if (!this.isRunning()) {
         return;
       }
-      // Stop the dispatcher
-      AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
-      if (ev != null && !ev.isStopped()) {
-        ev.stopProcessing();
-      }
-
-      // Stop the proxy (after the dispatcher, so the socket is still
-      // alive until after the dispatcher has stopped)
+      stopProcessing();
       stompProxyDead();
 
       // Close the listeners

http://git-wip-us.apache.org/repos/asf/geode/blob/0daf9549/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
index a1d933f..e6bc84c 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
@@ -30,7 +30,6 @@ import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.UpdateAttributesProcessor;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
-import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
 import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
 import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
 import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
@@ -124,11 +123,7 @@ public class SerialAsyncEventQueueImpl extends AbstractGatewaySender {
     this.getLifeCycleLock().writeLock().lock();
     try {
       // Stop the dispatcher
-      AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
-      if (ev != null && !ev.isStopped()) {
-        ev.stopProcessing();
-      }
-
+      stopProcessing();
       // Stop the proxy (after the dispatcher, so the socket is still
       // alive until after the dispatcher has stopped)
       stompProxyDead();

http://git-wip-us.apache.org/repos/asf/geode/blob/0daf9549/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 2154ffe..9b3b61f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -604,6 +604,18 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
     return enqueue;
   }
 
+  protected void stopProcessing() {
+    // Stop the dispatcher
+    AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
+    if (ev != null && !ev.isStopped()) {
+      ev.stopProcessing();
+    }
+
+    if (ev != null && ev.getDispatcher() != null) {
+      ev.getDispatcher().shutDownAckReaderConnection();
+    }
+  }
+
   protected void stompProxyDead() {
     Runnable stomper = new Runnable() {
       public void run() {

http://git-wip-us.apache.org/repos/asf/geode/blob/0daf9549/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
index efdd0ce..b94132b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
@@ -183,4 +183,9 @@ public class GatewaySenderEventCallbackDispatcher implements GatewaySenderEventD
     // no op
 
   }
+
+  @Override
+  public void shutDownAckReaderConnection() {
+    // no op
+  }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/0daf9549/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
index 5bb5333..402210b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
@@ -29,4 +29,6 @@ public interface GatewaySenderEventDispatcher {
   public boolean isConnectedToRemote();
 
   public void stop();
+
+  public void shutDownAckReaderConnection();
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/0daf9549/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 3a41972..6c99168 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -363,6 +363,9 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
    * @throws GatewaySenderException
    */
   private void initializeConnection() throws GatewaySenderException, GemFireSecurityException {
+    if (ackReaderThread != null) {
+      ackReaderThread.shutDownAckReaderConnection();
+    }
     this.connectionLifeCycleLock.writeLock().lock();
     try {
       // Attempt to acquire a connection
@@ -821,6 +824,12 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
     return connection != null && !connection.isDestroyed();
   }
 
+  public void shutDownAckReaderConnection() {
+    if (ackReaderThread != null) {
+      ackReaderThread.shutDownAckReaderConnection();
+    }
+  }
+
   public void stop() {
     stopAckReaderThread();
     if (this.processor.isStopped()) {

http://git-wip-us.apache.org/repos/asf/geode/blob/0daf9549/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
index daeeb6f..d023704 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
@@ -30,7 +30,6 @@ import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.cache.UpdateAttributesProcessor;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
-import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
 import org.apache.geode.internal.cache.wan.AbstractRemoteGatewaySender;
 import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
 import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
@@ -103,10 +102,7 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender {
         return;
       }
       // Stop the dispatcher
-      AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
-      if (ev != null && !ev.isStopped()) {
-        ev.stopProcessing();
-      }
+      stopProcessing();
 
       // Stop the proxy (after the dispatcher, so the socket is still
       // alive until after the dispatcher has stopped)

http://git-wip-us.apache.org/repos/asf/geode/blob/0daf9549/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
index fe83d60..f4235b9 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -116,15 +116,10 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
     this.getLifeCycleLock().writeLock().lock();
     try {
       // Stop the dispatcher
-      AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
-      if (ev != null && !ev.isStopped()) {
-        ev.stopProcessing();
-      }
-
+      stopProcessing();
       // Stop the proxy (after the dispatcher, so the socket is still
       // alive until after the dispatcher has stopped)
       stompProxyDead();
-
       // Close the listeners
       for (AsyncEventListener listener : this.listeners) {
         listener.close();