You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2016/07/03 17:52:48 UTC
incubator-geode git commit: Revert "GEODE-1588: AckReader and
Dispatching thread are shut down before sending gateway sender close
connection messages"
Repository: incubator-geode
Updated Branches:
refs/heads/develop ec3555f16 -> 96f3af9de
Revert "GEODE-1588: AckReader and Dispatching thread are shut down before sending gateway sender close connection messages"
This reverts commit 8899fc8d744bfd3060bafd17b1b33e02c7db9e5f.
This checkin causes some instability with the wan dunit tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/96f3af9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/96f3af9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/96f3af9d
Branch: refs/heads/develop
Commit: 96f3af9dec1993cdd2d2109d6a711a74a56afcc3
Parents: ec3555f
Author: Jason Huynh <hu...@gmail.com>
Authored: Sun Jul 3 10:52:28 2016 -0700
Committer: Jason Huynh <hu...@gmail.com>
Committed: Sun Jul 3 10:52:28 2016 -0700
----------------------------------------------------------------------
.../AbstractGatewaySenderEventProcessor.java | 5 +--
...rentParallelGatewaySenderEventProcessor.java | 8 ++---
...urrentSerialGatewaySenderEventProcessor.java | 5 ++-
.../wan/GatewaySenderEventRemoteDispatcher.java | 34 ++++----------------
4 files changed, 15 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/96f3af9d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index e3e1a9e..ce08e8d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -1150,9 +1150,10 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
}
}
}
-
- setIsStopped(true);
+
dispatcher.stop();
+ //set isStopped to true
+ setIsStopped(true);
if (this.isAlive()) {
this.interrupt();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/96f3af9d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index 82a53d3..07a3be5 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -237,9 +237,6 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
if (!this.isAlive()) {
return;
}
-
- setIsStopped(true);
-
final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup
.createThreadGroup("ConcurrentParallelGatewaySenderEventProcessor Logger Group", logger);
@@ -251,12 +248,12 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
return thread;
}
};
-
+
List<SenderStopperCallable> stopperCallables = new ArrayList<SenderStopperCallable>();
for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) {
stopperCallables.add(new SenderStopperCallable(parallelProcessor));
}
-
+
ExecutorService stopperService = Executors.newFixedThreadPool(processors.length, threadFactory);
try {
List<Future<Boolean>> futures = stopperService.invokeAll(stopperCallables);
@@ -278,6 +275,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
throw rejectedExecutionEx;
}
+ setIsStopped(true);
stopperService.shutdown();
closeProcessor();
if (logger.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/96f3af9d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index a557ce1..ff810ec 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -268,8 +268,6 @@ public class ConcurrentSerialGatewaySenderEventProcessor extends
return;
}
- setIsStopped(true);
-
final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup
.createThreadGroup(
"ConcurrentSerialGatewaySenderEventProcessor Logger Group",
@@ -314,7 +312,8 @@ public class ConcurrentSerialGatewaySenderEventProcessor extends
}
//shutdown the stopperService. This will release all the stopper threads
stopperService.shutdown();
-
+ setIsStopped(true);
+
closeProcessor();
if (logger.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/96f3af9d/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 3948484..b178192 100644
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -301,9 +301,6 @@ public class GatewaySenderEventRemoteDispatcher implements
* @throws GatewaySenderException
*/
public Connection getConnection(boolean startAckReaderThread) throws GatewaySenderException{
- if (this.processor.isStopped()) {
- return null;
- }
// IF the connection is null
// OR the connection's ServerLocation doesn't match with the one stored in sender
// THEN initialize the connection
@@ -346,7 +343,7 @@ public class GatewaySenderEventRemoteDispatcher implements
if (con != null) {
if (!con.isDestroyed()) {
con.destroy();
- this.sender.getProxy().returnConnection(con);
+ this.sender.getProxy().returnConnection(con);
}
// Reset the connection so the next time through a new one will be
@@ -367,9 +364,6 @@ public class GatewaySenderEventRemoteDispatcher implements
*/
private void initializeConnection() throws GatewaySenderException,
GemFireSecurityException {
- if (this.processor.isStopped()) {
- return;
- }
this.connectionLifeCycleLock.writeLock().lock();
try {
// Attempt to acquire a connection
@@ -631,9 +625,9 @@ public class GatewaySenderEventRemoteDispatcher implements
}
} else {
// If we have received IOException.
- if (logger.isDebugEnabled()) {
+ // if (logger.isDebugEnabled()) {
logger.debug("{}: Received null ack from remote site.", processor.getSender());
- }
+ //}
processor.handleException();
try { // This wait is before trying to getting new connection to
// receive ack. Without this there will be continuous call to
@@ -729,11 +723,9 @@ public class GatewaySenderEventRemoteDispatcher implements
// not. No need to take lock as the reader thread may be blocked and we might not
// get chance to destroy unless that returns.
if (connection != null) {
- Connection conn = connection;
- shutDownAckReaderConnection();
- if (!conn.isDestroyed()) {
- conn.destroy();
- sender.getProxy().returnConnection(conn);
+ if (!connection.isDestroyed()) {
+ connection.destroy();
+ sender.getProxy().returnConnection(connection);
}
}
this.shutdown = true;
@@ -751,24 +743,12 @@ public class GatewaySenderEventRemoteDispatcher implements
logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_ACKREADERTHREAD_IGNORED_CANCELLATION));
}
}
-
- private void shutDownAckReaderConnection() {
- Connection conn = connection;
- //attempt to unblock the ackreader thread by shutting down the inputStream, if it was stuck on a read
- try {
- if (conn != null && conn.getSocket() != null) {
- conn.getSocket().shutdownInput();
- }
- } catch (IOException e) {
- logger.warn("Unable to shutdown AckReaderThread Connection");
- }
- }
}
public void stopAckReaderThread() {
if (this.ackReaderThread != null) {
this.ackReaderThread.shutdown();
- }
+ }
}
@Override