You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2016/07/05 23:15:29 UTC

[06/32] incubator-geode git commit: GEODE-1588: AckReader and Dispatching thread are shut down before sending gateway sender close connection messages

GEODE-1588: AckReader and Dispatching thread are shut down before sending gateway sender close connection messages

* There was an issue where the gateway sender thread was reading off the same socket as the ack reader.
  Instead, we force the ack reader thread to stop first, and close the inputstream to prevent reading garbled data
* Another issue was the ack reader thread was being spun up after being shut down.  Now we prevent the dispatching thread
  from doing so by checking to see if it is being shut down.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8899fc8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8899fc8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8899fc8d

Branch: refs/heads/feature/GEODE-1571
Commit: 8899fc8d744bfd3060bafd17b1b33e02c7db9e5f
Parents: 58d7fca
Author: Jason Huynh <hu...@gmail.com>
Authored: Wed Jun 22 10:42:53 2016 -0700
Committer: Jason Huynh <hu...@gmail.com>
Committed: Fri Jul 1 10:58:07 2016 -0700

----------------------------------------------------------------------
 .../AbstractGatewaySenderEventProcessor.java    |  5 ++-
 ...rentParallelGatewaySenderEventProcessor.java |  8 +++--
 ...urrentSerialGatewaySenderEventProcessor.java |  5 +--
 .../wan/GatewaySenderEventRemoteDispatcher.java | 34 ++++++++++++++++----
 4 files changed, 37 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8899fc8d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index ce08e8d..e3e1a9e 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -1150,10 +1150,9 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
         }
       }
     }
-   
-    dispatcher.stop();
-    //set isStopped to true
+
     setIsStopped(true);
+    dispatcher.stop();
 
     if (this.isAlive()) {
       this.interrupt();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8899fc8d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index 07a3be5..82a53d3 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -237,6 +237,9 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
     if (!this.isAlive()) {
       return;
     }
+
+    setIsStopped(true);
+
     final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup
         .createThreadGroup("ConcurrentParallelGatewaySenderEventProcessor Logger Group", logger);
 
@@ -248,12 +251,12 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
         return thread;
       }
     };
-    
+
     List<SenderStopperCallable> stopperCallables = new ArrayList<SenderStopperCallable>();
     for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) {
       stopperCallables.add(new SenderStopperCallable(parallelProcessor));
     }
-    
+
     ExecutorService stopperService = Executors.newFixedThreadPool(processors.length, threadFactory);
     try {
       List<Future<Boolean>> futures = stopperService.invokeAll(stopperCallables);
@@ -275,7 +278,6 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
       throw rejectedExecutionEx;
     }
     
-    setIsStopped(true);
     stopperService.shutdown();
     closeProcessor();
     if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8899fc8d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index ff810ec..a557ce1 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -268,6 +268,8 @@ public class ConcurrentSerialGatewaySenderEventProcessor extends
       return;
     }
 
+    setIsStopped(true);
+
     final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup
         .createThreadGroup(
             "ConcurrentSerialGatewaySenderEventProcessor Logger Group",
@@ -312,8 +314,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor extends
     }
     //shutdown the stopperService. This will release all the stopper threads
     stopperService.shutdown();
-    setIsStopped(true);
-    
+
     closeProcessor();
     
     if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8899fc8d/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index b178192..3948484 100644
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -301,6 +301,9 @@ public class GatewaySenderEventRemoteDispatcher implements
    * @throws GatewaySenderException
    */
   public Connection getConnection(boolean startAckReaderThread) throws GatewaySenderException{
+    if (this.processor.isStopped()) {
+      return null;
+    }
     // IF the connection is null 
     // OR the connection's ServerLocation doesn't match with the one stored in sender
     // THEN initialize the connection
@@ -343,7 +346,7 @@ public class GatewaySenderEventRemoteDispatcher implements
       if (con != null) {
         if (!con.isDestroyed()) {
           con.destroy();
-         this.sender.getProxy().returnConnection(con);
+          this.sender.getProxy().returnConnection(con);
         }
         
         // Reset the connection so the next time through a new one will be
@@ -364,6 +367,9 @@ public class GatewaySenderEventRemoteDispatcher implements
    */
   private void initializeConnection() throws GatewaySenderException,
       GemFireSecurityException {
+    if (this.processor.isStopped()) {
+      return;
+    }
     this.connectionLifeCycleLock.writeLock().lock(); 
     try {
       // Attempt to acquire a connection
@@ -625,9 +631,9 @@ public class GatewaySenderEventRemoteDispatcher implements
             }
           } else {
             // If we have received IOException.
-           // if (logger.isDebugEnabled()) {
+            if (logger.isDebugEnabled()) {
               logger.debug("{}: Received null ack from remote site.", processor.getSender());
-            //}
+            }
             processor.handleException();
             try { // This wait is before trying to getting new connection to
                   // receive ack. Without this there will be continuous call to
@@ -723,9 +729,11 @@ public class GatewaySenderEventRemoteDispatcher implements
       // not. No need to take lock as the reader thread may be blocked and we might not
       // get chance to destroy unless that returns.
       if (connection != null) {
-        if (!connection.isDestroyed()) {
-          connection.destroy();
-          sender.getProxy().returnConnection(connection);
+        Connection conn = connection;
+        shutDownAckReaderConnection();
+        if (!conn.isDestroyed()) {
+          conn.destroy();
+          sender.getProxy().returnConnection(conn);
         }
       }
       this.shutdown = true;
@@ -743,12 +751,24 @@ public class GatewaySenderEventRemoteDispatcher implements
         logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_ACKREADERTHREAD_IGNORED_CANCELLATION));
       }
     }
+
+    private void shutDownAckReaderConnection() {
+      Connection conn = connection;
+      //attempt to unblock the ackreader thread by shutting down the inputStream, if it was stuck on a read
+      try {
+        if (conn != null && conn.getSocket() != null) {
+          conn.getSocket().shutdownInput();
+        }
+      } catch (IOException e) {
+        logger.warn("Unable to shutdown AckReaderThread Connection");
+      }
+    }
   }
     
   public void stopAckReaderThread() {
     if (this.ackReaderThread != null) {
       this.ackReaderThread.shutdown();
-    }    
+    }
   }
   
   @Override