You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by jk...@apache.org on 2017/09/21 19:49:23 UTC

thrift git commit: THRIFT-4251: Fix JDK Epoll Bug in Thrift of TThreadedSelectorServer model. Client: Java

Repository: thrift
Updated Branches:
  refs/heads/master 8506121b3 -> 9ffb41d94


THRIFT-4251: Fix JDK Epoll Bug in Thrift of TThreadedSelectorServer model.
Client: Java

This closes #1313


Project: http://git-wip-us.apache.org/repos/asf/thrift/repo
Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/9ffb41d9
Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/9ffb41d9
Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/9ffb41d9

Branch: refs/heads/master
Commit: 9ffb41d94c1c48c3b2a0ce8bdbd03d8b8f97195b
Parents: 8506121
Author: Johnny-Liao <10...@qq.com>
Authored: Tue Jul 25 14:23:28 2017 +0800
Committer: James E. King, III <jk...@apache.org>
Committed: Thu Sep 21 12:48:48 2017 -0700

----------------------------------------------------------------------
 .../server/AbstractNonblockingServer.java       |  46 ++++----
 .../thrift/server/TThreadedSelectorServer.java  | 108 ++++++++++++++++---
 2 files changed, 115 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/thrift/blob/9ffb41d9/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
----------------------------------------------------------------------
diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
index 9c94b76..5c62b99 100644
--- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -19,15 +19,6 @@
 
 package org.apache.thrift.server;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.spi.SelectorProvider;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.thrift.TAsyncProcessor;
 import org.apache.thrift.TByteArrayOutputStream;
 import org.apache.thrift.TException;
@@ -42,6 +33,15 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * Provides common methods and classes used by nonblocking TServer
  * implementations.
@@ -102,7 +102,7 @@ public abstract class AbstractNonblockingServer extends TServer {
 
   /**
    * Starts any threads required for serving.
-   * 
+   *
    * @return true if everything went ok, false if threads could not be started.
    */
   protected abstract boolean startThreads();
@@ -115,7 +115,7 @@ public abstract class AbstractNonblockingServer extends TServer {
 
   /**
    * Have the server transport start accepting connections.
-   * 
+   *
    * @return true if we started listening successfully, false if something went
    *         wrong.
    */
@@ -139,7 +139,7 @@ public abstract class AbstractNonblockingServer extends TServer {
   /**
    * Perform an invocation. This method could behave several different ways -
    * invoke immediately inline, queue for separate execution, etc.
-   * 
+   *
    * @return true if invocation was successfully requested, which is not a
    *         guarantee that invocation has completed. False if the request
    *         failed.
@@ -152,7 +152,7 @@ public abstract class AbstractNonblockingServer extends TServer {
    * corresponding to requests.
    */
   protected abstract class AbstractSelectThread extends Thread {
-    protected final Selector selector;
+    protected Selector selector;
 
     // List of FrameBuffers that want to change their selection interests.
     protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
@@ -285,21 +285,21 @@ public abstract class AbstractNonblockingServer extends TServer {
     protected ByteBuffer buffer_;
 
     protected final TByteArrayOutputStream response_;
-    
+
     // the frame that the TTransport should wrap.
     protected final TMemoryInputTransport frameTrans_;
-    
+
     // the transport that should be used to connect to clients
     protected final TTransport inTrans_;
-    
+
     protected final TTransport outTrans_;
-    
+
     // the input protocol to use on frames
     protected final TProtocol inProt_;
-    
+
     // the output protocol to use on frames
     protected final TProtocol outProt_;
-    
+
     // context associated with this connection
     protected final ServerContext context_;
 
@@ -328,7 +328,7 @@ public abstract class AbstractNonblockingServer extends TServer {
     /**
      * Give this FrameBuffer a chance to read. The selector loop should have
      * received a read event for this FrameBuffer.
-     * 
+     *
      * @return true if the connection should live on, false if it should be
      *         closed
      */
@@ -455,7 +455,7 @@ public abstract class AbstractNonblockingServer extends TServer {
     public void close() {
       // if we're being closed due to an error, we might have allocated a
       // buffer that we need to subtract for our memory accounting.
-      if (state_ == FrameBufferState.READING_FRAME || 
+      if (state_ == FrameBufferState.READING_FRAME ||
           state_ == FrameBufferState.READ_FRAME_COMPLETE ||
           state_ == FrameBufferState.AWAITING_CLOSE) {
         readBufferBytesAllocated.addAndGet(-buffer_.array().length);
@@ -510,7 +510,7 @@ public abstract class AbstractNonblockingServer extends TServer {
     public void invoke() {
       frameTrans_.reset(buffer_.array());
       response_.reset();
-      
+
       try {
         if (eventHandler_ != null) {
           eventHandler_.processContext(context_, inTrans_, outTrans_);
@@ -530,7 +530,7 @@ public abstract class AbstractNonblockingServer extends TServer {
 
     /**
      * Perform a read into buffer.
-     * 
+     *
      * @return true if the read succeeded, false if there was an error or the
      *         connection closed.
      */

http://git-wip-us.apache.org/repos/asf/thrift/blob/9ffb41d9/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
----------------------------------------------------------------------
diff --git a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
index 353b8e0..038507e 100644
--- a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
+++ b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
@@ -19,7 +19,15 @@
 
 package org.apache.thrift.server;
 
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.spi.SelectorProvider;
@@ -37,24 +45,18 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TNonblockingTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * A Half-Sync/Half-Async server with a separate pool of threads to handle
  * non-blocking I/O. Accepts are handled on a single thread, and a configurable
  * number of nonblocking selector threads manage reading and writing of client
  * connections. A synchronous worker thread pool handles processing of requests.
- * 
+ *
  * Performs better than TNonblockingServer/THsHaServer in multi-core
  * environments when the the bottleneck is CPU on the single selector thread
  * handling I/O. In addition, because the accept handling is decoupled from
  * reads/writes and invocation, the server has better ability to handle back-
  * pressure from new connections (e.g. stop accepting when busy).
- * 
+ *
  * Like TNonblockingServer, it relies on the use of TFramedTransport.
  */
 public class TThreadedSelectorServer extends AbstractNonblockingServer {
@@ -205,7 +207,7 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
 
   /**
    * Start the accept and selector threads running to deal with clients.
-   * 
+   *
    * @return true if everything went ok, false if we couldn't start for some
    *         reason.
    */
@@ -349,7 +351,7 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
 
     /**
      * Set up the AcceptThead
-     * 
+     *
      * @throws IOException
      */
     public AcceptThread(TNonblockingServerTransport serverTransport,
@@ -478,10 +480,13 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
 
     // Accepted connections added by the accept thread.
     private final BlockingQueue<TNonblockingTransport> acceptedQueue;
+    private int SELECTOR_AUTO_REBUILD_THRESHOLD = 512;
+    private long MONITOR_PERIOD = 1000L;
+    private int jvmBug = 0;
 
     /**
      * Set up the SelectorThread with an unbounded queue for incoming accepts.
-     * 
+     *
      * @throws IOException
      *           if a selector cannot be created
      */
@@ -491,7 +496,7 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
 
     /**
      * Set up the SelectorThread with an bounded queue for incoming accepts.
-     * 
+     *
      * @throws IOException
      *           if a selector cannot be created
      */
@@ -501,7 +506,7 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
 
     /**
      * Set up the SelectorThread with a specified queue for connections.
-     * 
+     *
      * @param acceptedQueue
      *          The BlockingQueue implementation for holding incoming accepted
      *          connections.
@@ -515,7 +520,7 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
     /**
      * Hands off an accepted connection to be handled by this thread. This
      * method will block if the queue for new connections is at capacity.
-     * 
+     *
      * @param accepted
      *          The connection that has been accepted.
      * @return true if the connection has been successfully added.
@@ -566,8 +571,8 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
      */
     private void select() {
       try {
-        // wait for io events.
-        selector.select();
+
+        doSelect();
 
         // process the io events we received
         Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
@@ -596,6 +601,77 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer {
       }
     }
 
+    /**
+     * Do select and judge epoll bug happen.
+     * See : https://issues.apache.org/jira/browse/THRIFT-4251
+     */
+    private void doSelect() throws IOException {
+      long beforeSelect = System.currentTimeMillis();
+      int selectedNums = selector.select();
+      long afterSelect = System.currentTimeMillis();
+
+      if (selectedNums == 0) {
+        jvmBug++;
+      } else {
+        jvmBug = 0;
+      }
+
+      long selectedTime = afterSelect - beforeSelect;
+      if (selectedTime >= MONITOR_PERIOD) {
+        jvmBug = 0;
+      } else if (jvmBug > SELECTOR_AUTO_REBUILD_THRESHOLD) {
+        LOGGER.warn("In {} ms happen {} times jvm bug; rebuilding selector.", MONITOR_PERIOD, jvmBug);
+        rebuildSelector();
+        selector.selectNow();
+        jvmBug = 0;
+      }
+
+    }
+
+    /**
+     * Replaces the current Selector of this SelectorThread with newly created Selector to work
+     * around the infamous epoll 100% CPU bug.
+     */
+    private synchronized void rebuildSelector() {
+      final Selector oldSelector = selector;
+      if (oldSelector == null) {
+        return;
+      }
+      Selector newSelector = null;
+      try {
+        newSelector = Selector.open();
+        LOGGER.warn("Created new Selector.");
+      } catch (IOException e) {
+        LOGGER.error("Create new Selector error.", e);
+      }
+
+      for (SelectionKey key : oldSelector.selectedKeys()) {
+        if (!key.isValid() && key.readyOps() == 0)
+          continue;
+        SelectableChannel channel = key.channel();
+        Object attachment = key.attachment();
+
+        try {
+          if (attachment == null) {
+            channel.register(newSelector, key.readyOps());
+          } else {
+            channel.register(newSelector, key.readyOps(), attachment);
+          }
+        } catch (ClosedChannelException e) {
+          LOGGER.error("Register new selector key error.", e);
+        }
+
+      }
+
+      selector = newSelector;
+      try {
+        oldSelector.close();
+      } catch (IOException e) {
+        LOGGER.error("Close old selector error.", e);
+      }
+      LOGGER.warn("Replace new selector success.");
+    }
+
     private void processAcceptedConnections() {
       // Register accepted connections
       while (!stopped_) {