You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2017/08/24 01:24:42 UTC
geode git commit: GEODE-3276: Managing race conditions while the
senders are stopped
Repository: geode
Updated Branches:
refs/heads/develop a5469147d -> 0daf9549a
GEODE-3276: Managing race conditions while the senders are stopped
* When a connection is initialized, a readAckThread may be alive from a previous incarnation.
* This AckThread will be stuck on a read socket with no timeout as nothing was dispatched.
* Also while it was stuck on the read, it will hold a connection lifecycle read lock
* The initialize connection needs a connection life cycle write lock to start the connection but the read lock is held by the ack thread.
* This results in a deadlock and eventually a hang.
* Another situation is that we set the flag isStopped for the event processor before actually shutting down the diapatcher and ack thread.
* So after the flag is set and before actually shutting down the dispatcher and ackThread, a gateway proxy stomper thread gets in between these two steps of execution.
* The stomper thread checks the isStopped flag, which was set to true, and proceeds to destroy the connection pool. However the dispatcher and ackThread were still running.
* This results in a out of heap memory exception while the ack thread is reading from the socket while connection pool was destroyed.
* To solve this issue, the stomper thread checks if the event processor and dispatcher exists, if true then we close the input streams before destroying the connection pool.
This closes #732
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/0daf9549
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/0daf9549
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/0daf9549
Branch: refs/heads/develop
Commit: 0daf9549a69a135ad1ab267be8e0749e00986dfa
Parents: a546914
Author: nabarun <nn...@pivotal.io>
Authored: Thu Aug 17 10:51:15 2017 -0700
Committer: nabarun <nn...@pivotal.io>
Committed: Wed Aug 23 18:18:29 2017 -0700
----------------------------------------------------------------------
.../internal/ParallelAsyncEventQueueImpl.java | 10 +---------
.../asyncqueue/internal/SerialAsyncEventQueueImpl.java | 7 +------
.../geode/internal/cache/wan/AbstractGatewaySender.java | 12 ++++++++++++
.../cache/wan/GatewaySenderEventCallbackDispatcher.java | 5 +++++
.../cache/wan/GatewaySenderEventDispatcher.java | 2 ++
.../cache/wan/GatewaySenderEventRemoteDispatcher.java | 9 +++++++++
.../cache/wan/parallel/ParallelGatewaySenderImpl.java | 6 +-----
.../cache/wan/serial/SerialGatewaySenderImpl.java | 7 +------
8 files changed, 32 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/0daf9549/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
index 9fa15f7..538b65a 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/ParallelAsyncEventQueueImpl.java
@@ -30,7 +30,6 @@ import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.cache.UpdateAttributesProcessor;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
-import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
@@ -109,14 +108,7 @@ public class ParallelAsyncEventQueueImpl extends AbstractGatewaySender {
if (!this.isRunning()) {
return;
}
- // Stop the dispatcher
- AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
- if (ev != null && !ev.isStopped()) {
- ev.stopProcessing();
- }
-
- // Stop the proxy (after the dispatcher, so the socket is still
- // alive until after the dispatcher has stopped)
+ stopProcessing();
stompProxyDead();
// Close the listeners
http://git-wip-us.apache.org/repos/asf/geode/blob/0daf9549/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
index a1d933f..e6bc84c 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/SerialAsyncEventQueueImpl.java
@@ -30,7 +30,6 @@ import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.UpdateAttributesProcessor;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
-import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException;
@@ -124,11 +123,7 @@ public class SerialAsyncEventQueueImpl extends AbstractGatewaySender {
this.getLifeCycleLock().writeLock().lock();
try {
// Stop the dispatcher
- AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
- if (ev != null && !ev.isStopped()) {
- ev.stopProcessing();
- }
-
+ stopProcessing();
// Stop the proxy (after the dispatcher, so the socket is still
// alive until after the dispatcher has stopped)
stompProxyDead();
http://git-wip-us.apache.org/repos/asf/geode/blob/0daf9549/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 2154ffe..9b3b61f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -604,6 +604,18 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
return enqueue;
}
+ protected void stopProcessing() {
+ // Stop the dispatcher
+ AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
+ if (ev != null && !ev.isStopped()) {
+ ev.stopProcessing();
+ }
+
+ if (ev != null && ev.getDispatcher() != null) {
+ ev.getDispatcher().shutDownAckReaderConnection();
+ }
+ }
+
protected void stompProxyDead() {
Runnable stomper = new Runnable() {
public void run() {
http://git-wip-us.apache.org/repos/asf/geode/blob/0daf9549/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
index efdd0ce..b94132b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackDispatcher.java
@@ -183,4 +183,9 @@ public class GatewaySenderEventCallbackDispatcher implements GatewaySenderEventD
// no op
}
+
+ @Override
+ public void shutDownAckReaderConnection() {
+ // no op
+ }
}
http://git-wip-us.apache.org/repos/asf/geode/blob/0daf9549/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
index 5bb5333..402210b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventDispatcher.java
@@ -29,4 +29,6 @@ public interface GatewaySenderEventDispatcher {
public boolean isConnectedToRemote();
public void stop();
+
+ public void shutDownAckReaderConnection();
}
http://git-wip-us.apache.org/repos/asf/geode/blob/0daf9549/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 3a41972..6c99168 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -363,6 +363,9 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
* @throws GatewaySenderException
*/
private void initializeConnection() throws GatewaySenderException, GemFireSecurityException {
+ if (ackReaderThread != null) {
+ ackReaderThread.shutDownAckReaderConnection();
+ }
this.connectionLifeCycleLock.writeLock().lock();
try {
// Attempt to acquire a connection
@@ -821,6 +824,12 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
return connection != null && !connection.isDestroyed();
}
+ public void shutDownAckReaderConnection() {
+ if (ackReaderThread != null) {
+ ackReaderThread.shutDownAckReaderConnection();
+ }
+ }
+
public void stop() {
stopAckReaderThread();
if (this.processor.isStopped()) {
http://git-wip-us.apache.org/repos/asf/geode/blob/0daf9549/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
index daeeb6f..d023704 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
@@ -30,7 +30,6 @@ import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.cache.UpdateAttributesProcessor;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
-import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.AbstractRemoteGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderAdvisor.GatewaySenderProfile;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
@@ -103,10 +102,7 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender {
return;
}
// Stop the dispatcher
- AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
- if (ev != null && !ev.isStopped()) {
- ev.stopProcessing();
- }
+ stopProcessing();
// Stop the proxy (after the dispatcher, so the socket is still
// alive until after the dispatcher has stopped)
http://git-wip-us.apache.org/repos/asf/geode/blob/0daf9549/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
index fe83d60..f4235b9 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -116,15 +116,10 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
this.getLifeCycleLock().writeLock().lock();
try {
// Stop the dispatcher
- AbstractGatewaySenderEventProcessor ev = this.eventProcessor;
- if (ev != null && !ev.isStopped()) {
- ev.stopProcessing();
- }
-
+ stopProcessing();
// Stop the proxy (after the dispatcher, so the socket is still
// alive until after the dispatcher has stopped)
stompProxyDead();
-
// Close the listeners
for (AsyncEventListener listener : this.listeners) {
listener.close();