You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2015/09/16 01:47:54 UTC

[1/2] incubator-geode git commit: p2p readers and handshakers threads are now pooled

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-332 [created] 6d6c760cc


p2p readers and handshakers threads are now pooled


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/fa017689
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/fa017689
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/fa017689

Branch: refs/heads/feature/GEODE-332
Commit: fa017689ef73d9003eade82c6cb27634c03be05f
Parents: 4e65f0c
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Sep 15 16:03:40 2015 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Sep 15 16:03:40 2015 -0700

----------------------------------------------------------------------
 .../gemfire/internal/tcp/Connection.java        | 58 +++++++++++--------
 .../gemfire/internal/tcp/ConnectionTable.java   | 61 +++++++++++++++++++-
 2 files changed, 93 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fa017689/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
index cd1b7dc..630ecfe 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java
@@ -115,6 +115,11 @@ public class Connection implements Runnable {
 
   /** the table holding this connection */
   final ConnectionTable owner;
+  
+  /** Set to false once run() is terminating. Using this instead of Thread.isAlive  
+    * as the reader thread may be a pooled thread.
+    */ 
+  private volatile boolean isRunning = false; 
 
   /** true if connection is a shared resource that can be used by more than one thread */
   private boolean sharedResource;
@@ -136,11 +141,14 @@ public class Connection implements Runnable {
   }
 
   private final static ThreadLocal isReaderThread = new ThreadLocal();
-  // return true if this thread is a reader thread
   public final static void makeReaderThread() {
     // mark this thread as a reader thread
-    isReaderThread.set(Boolean.TRUE);
+    makeReaderThread(true);
   }
+  private final static void makeReaderThread(boolean v) {
+    isReaderThread.set(v);
+  }
+  // return true if this thread is a reader thread
   public final static boolean isReaderThread() {
     Object o = isReaderThread.get();
     if (o == null) {
@@ -523,7 +531,7 @@ public class Connection implements Runnable {
     Connection c = new Connection(t, s);
     boolean readerStarted = false;
     try {
-      c.startReader();
+      c.startReader(t);
       readerStarted = true;
     } finally {
       if (!readerStarted) {
@@ -822,11 +830,12 @@ public class Connection implements Runnable {
     Runnable r = new Runnable() {
       public void run() {
         boolean rShuttingDown = readerShuttingDown;
+        final Thread localRef = readerThread;
         synchronized(stateLock) {
-          if (readerThread != null && readerThread.isAlive() &&
+          if (localRef != null && isRunning &&
               !rShuttingDown && connectionState == STATE_READING
               || connectionState == STATE_READING_ACK) {
-            readerThread.interrupt();
+            localRef.interrupt();
           }
         }
       }
@@ -951,7 +960,7 @@ public class Connection implements Runnable {
    *
    * @throws IOException if handshake fails
    */
-  private void attemptHandshake() throws IOException {
+  private void attemptHandshake(ConnectionTable connTable) throws IOException {
     // send HANDSHAKE
     // send this server's port.  It's expected on the other side
     if (useNIO()) {
@@ -961,7 +970,7 @@ public class Connection implements Runnable {
       handshakeStream();
     }
 
-    startReader(); // this reader only reads the handshake and then exits
+    startReader(connTable); // this reader only reads the handshake and then exits
     waitForHandshake(); // waiting for reply
   }
 
@@ -1099,7 +1108,7 @@ public class Connection implements Runnable {
         if (conn != null) {
           // handshake
           try {
-            conn.attemptHandshake();
+            conn.attemptHandshake(t);
             if (conn.isSocketClosed()) {
               // something went wrong while reading the handshake
               // and the socket was closed or this guy sent us a
@@ -1601,14 +1610,14 @@ public class Connection implements Runnable {
         this.owner.owner.getLocalId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) {
       isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor"));
     }
-    if (!beingSick && this.readerThread != null && !isIBM && this.readerThread.isAlive()
+    if (!beingSick && this.readerThread != null && !isIBM && this.isRunning
         && this.readerThread != Thread.currentThread()) {
       try {
         this.readerThread.join(500);
-        if (this.readerThread.isAlive() && !this.readerShuttingDown
+        if (this.isRunning && !this.readerShuttingDown
             && owner.getDM().getRootCause() == null) { // don't wait twice if there's a system failure
           this.readerThread.join(1500);
-          if (this.readerThread.isAlive()) {
+          if (this.isRunning) {
             logger.info(LocalizedMessage.create(LocalizedStrings.Connection_TIMED_OUT_WAITING_FOR_READERTHREAD_ON_0_TO_FINISH, this));
           }
         }
@@ -1677,26 +1686,22 @@ public class Connection implements Runnable {
   }
 
   /** starts a reader thread */
-  private void startReader() {
-    ThreadGroup group =
-      LoggingThreadGroup.createThreadGroup("P2P Reader Threads", logger);
-    Assert.assertTrue(this.readerThread == null);
-    this.readerThread =
-      new Thread(group, this, p2pReaderName());
-    this.readerThread.setDaemon(true);
-    stopped = false;
-    this.readerThread.start();
-  }
+  private void startReader(ConnectionTable connTable) { 
+    Assert.assertTrue(!this.isRunning); 
+    stopped = false; 
+    this.isRunning = true; 
+    connTable.executeCommand(this);  
+  } 
 
 
   /** in order to read non-NIO socket-based messages we need to have a thread
       actively trying to grab bytes out of the sockets input queue.
       This is that thread. */
   public void run() {
+    this.readerThread = Thread.currentThread();
+    this.readerThread.setName(p2pReaderName());
     ConnectionTable.threadWantsSharedResources();
-    if (this.isReceiver) {
-      makeReaderThread();
-    }
+    makeReaderThread(this.isReceiver);
     try {
       if (useNIO()) {
         runNioReader();
@@ -1725,6 +1730,9 @@ public class Connection implements Runnable {
       // for the handshake.
       // see bug 37524 for an example of listeners hung in waitForHandshake
       notifyHandshakeWaiter(false);
+      this.isRunning = false;
+      this.readerThread.setName("idle p2p reader");
+      this.readerThread = null;
     } // finally
   }
 
@@ -3307,7 +3315,7 @@ public class Connection implements Runnable {
   protected Object stateLock = new Object();
   
   /** for timeout processing, this is the current state of the connection */
-  protected byte connectionState;
+  protected byte connectionState = STATE_IDLE;
   
   /*~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~*/
   /** the connection is idle, but may be in use */

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/fa017689/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
index 9beb947..525c687 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
@@ -19,8 +19,15 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.logging.log4j.Logger;
@@ -37,6 +44,7 @@ import com.gemstone.gemfire.internal.SocketCreator;
 import com.gemstone.gemfire.internal.SystemTimer;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.LoggingThreadGroup;
 import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
 import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 
@@ -129,7 +137,13 @@ public class ConnectionTable  {
    */
   private volatile boolean closed = false;
 
-
+  /**
+   * Executor used by p2p reader and p2p handshaker threads.
+   */
+  private Executor p2pReaderThreadPool;
+  /** Number of seconds to wait before timing out an unused p2p reader thread. Default is 120 (2 minutes). */
+  private final static long READER_POOL_KEEP_ALIVE_TIME = Long.getLong("p2p.READER_POOL_KEEP_ALIVE_TIME", 120).longValue();
+  
   /**
    * The most recent instance to be created
    * 
@@ -202,11 +216,40 @@ public class ConnectionTable  {
     this.threadOrderedConnMap = new ThreadLocal();
     this.threadConnMaps = new ArrayList();
     this.threadConnectionMap = new ConcurrentHashMap();
+    this.p2pReaderThreadPool = createThreadPoolForIO(c.getDM().getSystem().isShareSockets());
   /*  NOMUX: if (TCPConduit.useNIO) {
       inputMuxManager = new InputMuxManager(this);
       inputMuxManager.start(c.logger);
     }*/
   }
+  
+  private Executor createThreadPoolForIO(boolean conserveSockets) { 
+    Executor executor = null; 
+    final ThreadGroup connectionRWGroup = LoggingThreadGroup.createThreadGroup("P2P Reader Threads", logger);
+    if (conserveSockets) { 
+      executor = new Executor() { 
+        @Override 
+        public void execute(Runnable command) { 
+          Thread th = new Thread(connectionRWGroup, command); 
+          th.setDaemon(true); 
+          th.start(); 
+        } 
+      }; 
+    } 
+    else { 
+      BlockingQueue synchronousQueue = new SynchronousQueue(); 
+      ThreadFactory tf = new ThreadFactory() { 
+        public Thread newThread(final Runnable command) { 
+          Thread thread = new Thread(connectionRWGroup, command); 
+          thread.setDaemon(true); 
+          return thread; 
+        } 
+      }; 
+      executor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, READER_POOL_KEEP_ALIVE_TIME, 
+          TimeUnit.SECONDS, synchronousQueue, tf); 
+    } 
+    return executor; 
+  } 
 
   /** conduit sends connected() after establishing the server socket */
 //   protected void connected() {
@@ -715,6 +758,15 @@ public class ConnectionTable  {
         this.threadConnMaps.clear();
       }
     }
+    {
+      Executor localExec = this.p2pReaderThreadPool;
+      if (localExec != null) {
+        if (localExec instanceof ExecutorService) {
+          ((ExecutorService)localExec).shutdown();
+        }
+        this.p2pReaderThreadPool = null;
+      }
+    }
     closeReceivers(false);
     
     Map m = (Map)this.threadOrderedConnMap.get();
@@ -726,6 +778,13 @@ public class ConnectionTable  {
     }
   }
 
+  public void executeCommand(Runnable runnable) { 
+    Executor local = this.p2pReaderThreadPool;
+    if (local != null) {
+      local.execute(runnable);
+    }
+  }
+  
   /**
    * Close all receiving threads.  This is used during shutdown and is also
    * used by a test hook that makes us deaf to incoming messages.


[2/2] incubator-geode git commit: async close now done with thread pool

Posted by ds...@apache.org.
async close now done with thread pool


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/6d6c760c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/6d6c760c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/6d6c760c

Branch: refs/heads/feature/GEODE-332
Commit: 6d6c760ccb17d4bd0d683072f1d79dc895926c27
Parents: fa01768
Author: Darrel Schneider <ds...@pivotal.io>
Authored: Tue Sep 15 16:47:17 2015 -0700
Committer: Darrel Schneider <ds...@pivotal.io>
Committed: Tue Sep 15 16:47:17 2015 -0700

----------------------------------------------------------------------
 .../gemfire/internal/SocketCreator.java         | 69 +++++++++++++-------
 .../gemfire/internal/tcp/ConnectionTable.java   |  1 +
 2 files changed, 48 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6d6c760c/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
index ff4a22c..0688c3d 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/SocketCreator.java
@@ -81,6 +81,11 @@ import com.gemstone.gemfire.internal.util.PasswordUtil;
 import com.gemstone.org.jgroups.util.ConnectionWatcher;
 
 import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.*;
 
@@ -1197,6 +1202,40 @@ public class SocketCreator  implements com.gemstone.org.jgroups.util.SockCreator
     return (String[]) v.toArray( new String[ v.size() ] );
   }
   
+  /** thread pool of async close threads */
+  private static ThreadPoolExecutor asyncCloseExecutor;
+  /** Number of seconds to wait before timing out an unused async close thread. Default is 120 (2 minutes). */
+  private final static long ASYNC_CLOSE_POOL_KEEP_ALIVE_TIME = Long.getLong("p2p.ASYNC_CLOSE_POOL_KEEP_ALIVE_TIME", 120).longValue();
+  
+  private static synchronized ThreadPoolExecutor getAsyncThreadExecutor() {
+    ThreadPoolExecutor pool = asyncCloseExecutor;
+    if (pool == null) {
+      final ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
+      ThreadFactory tf = new ThreadFactory() { 
+        public Thread newThread(final Runnable command) { 
+          Thread thread = new Thread(tg, command); 
+          thread.setDaemon(true);
+          return thread;
+        } 
+      }; 
+      BlockingQueue synchronousQueue = new SynchronousQueue(); 
+      pool = new ThreadPoolExecutor(1, Integer.MAX_VALUE, ASYNC_CLOSE_POOL_KEEP_ALIVE_TIME, TimeUnit.SECONDS, synchronousQueue, tf);
+      asyncCloseExecutor = pool;
+    }
+    return pool;
+  }
+  public static synchronized void closeAsyncThreadExecutor() {
+    ThreadPoolExecutor pool = asyncCloseExecutor;
+    if (pool != null) {
+      pool.shutdownNow();
+      asyncCloseExecutor = null;
+    }
+  }
+  private static synchronized void asyncExecute(Runnable r) {
+    // The old code waited 50ms for the async task to complete.
+    // Should this code use submit on the executor and also wait 50ms?
+    getAsyncThreadExecutor().execute(r);
+  }
   /**
    * Closes the specified socket in a background thread and waits a limited
    * amount of time for the close to complete. In some cases we see close
@@ -1206,44 +1245,30 @@ public class SocketCreator  implements com.gemstone.org.jgroups.util.SockCreator
    * @param who who the socket is connected to
    * @param extra an optional Runnable with stuff to execute in the async thread
    */
-  public static void asyncClose(final Socket sock, String who, final Runnable extra) {
+  public static void asyncClose(final Socket sock, final String who, final Runnable extra) {
     if (sock == null || sock.isClosed()) {
       return;
     }
     try {
-    ThreadGroup tg = LoggingThreadGroup.createThreadGroup("Socket asyncClose", logger);
-
-    Thread t = new Thread(tg, new Runnable() {
+      asyncExecute(new Runnable() {
         public void run() {
+          Thread.currentThread().setName("AsyncSocketCloser for " + who);
+          try {
           if (extra != null) {
             extra.run();
           }
           inlineClose(sock);
+          } finally {
+            Thread.currentThread().setName("unused AsyncSocketCloser");
+          }
         }
-      }, "AsyncSocketCloser for " + who);
-    t.setDaemon(true);
-    try {
-      t.start();
+      });
     } catch (OutOfMemoryError ignore) {
       // If we can't start a thread to close the socket just do it inline.
       // See bug 50573.
       inlineClose(sock);
       return;
     }
-    try {
-      // [bruce] if the network fails, this will wait the full amount of time
-      // on every close, so it must be kept very short.  it was 750ms before,
-      // causing frequent hangs in net-down hydra tests
-      t.join(50/*ms*/);
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    }
-    }
-    catch (VirtualMachineError e) {
-      SystemFailure.initiateFailure(e);
-      // NOTREACHED
-      throw e;
-    }
   }
   
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6d6c760c/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
index 525c687..84ea1eb 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java
@@ -776,6 +776,7 @@ public class ConnectionTable  {
         m.clear();
       }        
     }
+    SocketCreator.closeAsyncThreadExecutor();
   }
 
   public void executeCommand(Runnable runnable) {