You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2016/07/03 17:52:48 UTC

incubator-geode git commit: Revert "GEODE-1588: AckReader and Dispatching thread are shut down before sending gateway sender close connection messages"

Repository: incubator-geode
Updated Branches:
  refs/heads/develop ec3555f16 -> 96f3af9de


Revert "GEODE-1588: AckReader and Dispatching thread are shut down before sending gateway sender close connection messages"

This reverts commit 8899fc8d744bfd3060bafd17b1b33e02c7db9e5f.
This checkin causes some instability with the wan dunit tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/96f3af9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/96f3af9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/96f3af9d

Branch: refs/heads/develop
Commit: 96f3af9dec1993cdd2d2109d6a711a74a56afcc3
Parents: ec3555f
Author: Jason Huynh <hu...@gmail.com>
Authored: Sun Jul 3 10:52:28 2016 -0700
Committer: Jason Huynh <hu...@gmail.com>
Committed: Sun Jul 3 10:52:28 2016 -0700

----------------------------------------------------------------------
 .../AbstractGatewaySenderEventProcessor.java    |  5 +--
 ...rentParallelGatewaySenderEventProcessor.java |  8 ++---
 ...urrentSerialGatewaySenderEventProcessor.java |  5 ++-
 .../wan/GatewaySenderEventRemoteDispatcher.java | 34 ++++----------------
 4 files changed, 15 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/96f3af9d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index e3e1a9e..ce08e8d 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -1150,9 +1150,10 @@ public abstract class AbstractGatewaySenderEventProcessor extends Thread {
         }
       }
     }
-
-    setIsStopped(true);
+   
     dispatcher.stop();
+    //set isStopped to true
+    setIsStopped(true);
 
     if (this.isAlive()) {
       this.interrupt();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/96f3af9d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
index 82a53d3..07a3be5 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
@@ -237,9 +237,6 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
     if (!this.isAlive()) {
       return;
     }
-
-    setIsStopped(true);
-
     final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup
         .createThreadGroup("ConcurrentParallelGatewaySenderEventProcessor Logger Group", logger);
 
@@ -251,12 +248,12 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
         return thread;
       }
     };
-
+    
     List<SenderStopperCallable> stopperCallables = new ArrayList<SenderStopperCallable>();
     for (ParallelGatewaySenderEventProcessor parallelProcessor : this.processors) {
       stopperCallables.add(new SenderStopperCallable(parallelProcessor));
     }
-
+    
     ExecutorService stopperService = Executors.newFixedThreadPool(processors.length, threadFactory);
     try {
       List<Future<Boolean>> futures = stopperService.invokeAll(stopperCallables);
@@ -278,6 +275,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor extends AbstractGatew
       throw rejectedExecutionEx;
     }
     
+    setIsStopped(true);
     stopperService.shutdown();
     closeProcessor();
     if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/96f3af9d/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index a557ce1..ff810ec 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -268,8 +268,6 @@ public class ConcurrentSerialGatewaySenderEventProcessor extends
       return;
     }
 
-    setIsStopped(true);
-
     final LoggingThreadGroup loggingThreadGroup = LoggingThreadGroup
         .createThreadGroup(
             "ConcurrentSerialGatewaySenderEventProcessor Logger Group",
@@ -314,7 +312,8 @@ public class ConcurrentSerialGatewaySenderEventProcessor extends
     }
     //shutdown the stopperService. This will release all the stopper threads
     stopperService.shutdown();
-
+    setIsStopped(true);
+    
     closeProcessor();
     
     if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/96f3af9d/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 3948484..b178192 100644
--- a/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/com/gemstone/gemfire/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -301,9 +301,6 @@ public class GatewaySenderEventRemoteDispatcher implements
    * @throws GatewaySenderException
    */
   public Connection getConnection(boolean startAckReaderThread) throws GatewaySenderException{
-    if (this.processor.isStopped()) {
-      return null;
-    }
     // IF the connection is null 
     // OR the connection's ServerLocation doesn't match with the one stored in sender
     // THEN initialize the connection
@@ -346,7 +343,7 @@ public class GatewaySenderEventRemoteDispatcher implements
       if (con != null) {
         if (!con.isDestroyed()) {
           con.destroy();
-          this.sender.getProxy().returnConnection(con);
+         this.sender.getProxy().returnConnection(con);
         }
         
         // Reset the connection so the next time through a new one will be
@@ -367,9 +364,6 @@ public class GatewaySenderEventRemoteDispatcher implements
    */
   private void initializeConnection() throws GatewaySenderException,
       GemFireSecurityException {
-    if (this.processor.isStopped()) {
-      return;
-    }
     this.connectionLifeCycleLock.writeLock().lock(); 
     try {
       // Attempt to acquire a connection
@@ -631,9 +625,9 @@ public class GatewaySenderEventRemoteDispatcher implements
             }
           } else {
             // If we have received IOException.
-            if (logger.isDebugEnabled()) {
+           // if (logger.isDebugEnabled()) {
               logger.debug("{}: Received null ack from remote site.", processor.getSender());
-            }
+            //}
             processor.handleException();
             try { // This wait is before trying to getting new connection to
                   // receive ack. Without this there will be continuous call to
@@ -729,11 +723,9 @@ public class GatewaySenderEventRemoteDispatcher implements
       // not. No need to take lock as the reader thread may be blocked and we might not
       // get chance to destroy unless that returns.
       if (connection != null) {
-        Connection conn = connection;
-        shutDownAckReaderConnection();
-        if (!conn.isDestroyed()) {
-          conn.destroy();
-          sender.getProxy().returnConnection(conn);
+        if (!connection.isDestroyed()) {
+          connection.destroy();
+          sender.getProxy().returnConnection(connection);
         }
       }
       this.shutdown = true;
@@ -751,24 +743,12 @@ public class GatewaySenderEventRemoteDispatcher implements
         logger.warn(LocalizedMessage.create(LocalizedStrings.GatewaySender_ACKREADERTHREAD_IGNORED_CANCELLATION));
       }
     }
-
-    private void shutDownAckReaderConnection() {
-      Connection conn = connection;
-      //attempt to unblock the ackreader thread by shutting down the inputStream, if it was stuck on a read
-      try {
-        if (conn != null && conn.getSocket() != null) {
-          conn.getSocket().shutdownInput();
-        }
-      } catch (IOException e) {
-        logger.warn("Unable to shutdown AckReaderThread Connection");
-      }
-    }
   }
     
   public void stopAckReaderThread() {
     if (this.ackReaderThread != null) {
       this.ackReaderThread.shutdown();
-    }
+    }    
   }
   
   @Override