You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/09/22 14:51:09 UTC

svn commit: r1388798 - /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java

Author: mbautin
Date: Sat Sep 22 12:51:08 2012
New Revision: 1388798

URL: http://svn.apache.org/viewvc?rev=1388798&view=rev
Log:
[jira] [HBASE-6619] [89-fb] Do no unregister and re-register interest ops in RPC

Author: michalgr

Summary:
Improvements in how we are dealing with incoming requests. Right now we are using deserialization pool. Every tame we get read event on a selector in listener, we deregister read operation and pass descriptor to deserialization pool. Then some thread (one of #CPUs + 1) reads some bytes, then registers read interest again and wake ups listener.

My changes introduce pool of Reader threads. Those threads have their own selectors and they do not have to register/deregister anything all the time.

This patch also delays parsing of incoming request. RawCalls are queued, and then parsed in Handler thread that gets it.

Test Plan:
Unit tests, 4 fails:

TestDistributedLogSplitAtStartup
TestSplitLogWorker
TestDistributedLogSplitting
TestLogSplitOnMasterFailover

All 4 fail without these changes.

Impact on performance can be seen here: https://our.intern.facebook.com/intern/wiki/index.php/HBase/PerfExperiments/HBase-versus-Hypertable

Reviewers: kranganathan

Reviewed By: kranganathan

CC: JIRA, Kannan, aaiyer, avf, mbautin, Liyin, gqchen

Differential Revision: https://reviews.facebook.net/D5283

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java?rev=1388798&r1=1388797&r2=1388798&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseServer.java Sat Sep 22 12:51:08 2012
@@ -44,26 +44,16 @@ import java.nio.channels.ServerSocketCha
 import java.nio.channels.SocketChannel;
 import java.nio.channels.WritableByteChannel;
 import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.util.DaemonThreadFactory;
 import org.apache.hadoop.hbase.util.SizeBasedThrottler;
 import org.apache.hadoop.hbase.io.WritableWithSize;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
-import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -194,7 +184,7 @@ public abstract class HBaseServer {
   private static final String RESPONSE_QUEUES_MAX_SIZE = "ipc.server.response.queue.maxsize";
 
   volatile protected boolean running = true;         // true while server runs
-  protected BlockingQueue<Call> callQueue; // queued calls
+  protected BlockingQueue<RawCall> callQueue; // queued calls
 
   protected final List<Connection> connectionList =
     Collections.synchronizedList(new LinkedList<Connection>());
@@ -254,13 +244,13 @@ public abstract class HBaseServer {
     protected String tag = null;
     protected long partialResponseSize; // size of the results collected so far
 
-    public Call(int id, Writable param, Connection connection) {
+    public Call(int id, Writable param, Connection connection, long timestamp) {
       this.id = id;
       this.param = param;
       this.connection = connection;
-      this.timestamp = System.currentTimeMillis();
       this.response = null;
       this.partialResponseSize = 0;
+      this.timestamp = timestamp;
     }
     
     public void setTag(String tag) {
@@ -309,7 +299,7 @@ public abstract class HBaseServer {
     }
   }
 
-  /** Listens on the socket. Creates jobs for the handler threads*/
+  /** Listens on the socket, accepts new connections and handles them to readers*/
   private class Listener extends HasThread {
 
     private ServerSocketChannel acceptChannel = null; //the accept channel
@@ -322,8 +312,14 @@ public abstract class HBaseServer {
                                           //two cleanup runs
     private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
     
-    /** The ipc deserialization thread pool */
-    protected ThreadPoolExecutor deserializationThreadPool;
+    private final int readerCount = conf.getInt("ipc.server.reader.count",
+        Runtime.getRuntime().availableProcessors() + 1); // number of reader threads
+
+    /* guarded by lock on this */
+    private final Reader[] readers;
+
+    /* guarded by lock on this */
+    private int readerRRIndex = 0; // New connections are passed to readers round-robin way.
 
     public Listener() throws IOException {
       // this will trigger a DNS lookup
@@ -343,30 +339,9 @@ public abstract class HBaseServer {
       this.setName("IPC Server listener on " + port);
       this.setDaemon(true);
       
-      // initialize the ipc deserializationThreadPool thread pool
-      int deserializationPoolMaxSize = conf.getInt("ipc.server.deserialization.threadPool.maxSize", 
-          Runtime.getRuntime().availableProcessors() + 1);
-      deserializationThreadPool = new ThreadPoolExecutor(
-          1, // the core pool size
-          deserializationPoolMaxSize, // the max pool size
-          60L, TimeUnit.SECONDS, // keep-alive time for each worker thread
-          new SynchronousQueue<Runnable>(), // direct handoffs
-          new DaemonThreadFactory("IPC-Deserialization"),
-          new RejectedExecutionHandler() {
-            @Override
-            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
-              try {
-                // The submission (listener) thread will be blocked until the thread pool frees up.
-                executor.getQueue().put(r);
-              } catch (InterruptedException e) {
-                throw new RejectedExecutionException(
-                    "Failed to requeue the rejected request because of ", e);
-              }
-            }
-          });
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Initialize the deserializationThreadPool with maxium " + 
-            deserializationPoolMaxSize + " threads");
+      readers = new Reader[readerCount];
+      for (int i = 0; i < readerCount; i++) {
+        readers[i] = new Reader(i);
       }
     }
 
@@ -426,44 +401,26 @@ public abstract class HBaseServer {
       SERVER.set(HBaseServer.this);
 
       while (running) {
-        SelectionKey key = null;
         try {
           selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
-          while (iter.hasNext()) {
-            key = iter.next();
+          while (iter.hasNext() && running) {
+            SelectionKey key = iter.next();
             iter.remove();
             try {
               if (key.isValid()) {
                 if (key.isAcceptable()) {
                   doAccept(key);
-                } else if (key.isReadable()) {
-                  doAsyncRead(key);
+                } else {
+                  LOG.warn("Woken on not acceptable channel");
                 }
               }
             } catch (IOException ignored) {
             }
             key = null;
           }
-        } catch (OutOfMemoryError e) {
-          if (errorHandler != null) {
-            if (errorHandler.checkOOME(e)) {
-              LOG.info(getName() + ": exiting on OOME");
-              closeCurrentConnection(key);
-              cleanupConnections(true);
-              return;
-            }
-          } else {
-            // we can run out of memory if we have too many threads
-            // log the event and sleep for a minute and give
-            // some thread(s) a chance to finish
-            LOG.warn("Out of Memory in server select", e);
-            closeCurrentConnection(key);
-            cleanupConnections(true);
-            try { Thread.sleep(60000); } catch (Exception ignored) {}
-          }
-        } catch (Exception e) {
-          closeCurrentConnection(key);
+        } catch (IOException e) {
+          LOG.error("Exception caught: " + e.toString());
         }
         cleanupConnections(false);
       }
@@ -485,48 +442,6 @@ public abstract class HBaseServer {
       }
     }
 
-    private void doAsyncRead(final SelectionKey readSelectionKey) {
-      unsetReadInterest(readSelectionKey);
-      
-      // submit the doRead request to the thread pool in order to deserialize the data in parallel
-      try {
-        deserializationThreadPool.submit(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              doRead(readSelectionKey);
-            } catch (InterruptedException e) {
-              if (LOG.isTraceEnabled()) {
-                LOG.trace("Caught: " + StringUtils.stringifyException(e) +
-                    " when processing " + readSelectionKey.attachment());
-              }
-            } finally {
-              setReadInterest(readSelectionKey);
-              // wake up the selector from the blocking function select()
-              selector.wakeup();
-            }
-          }
-        });
-      } catch (Throwable e) {
-        setReadInterest(readSelectionKey);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Caught " + e.getMessage() + " when processing the remote connection " +
-              readSelectionKey.attachment().toString());
-        }
-      }
-    }
-
-    private void closeCurrentConnection(SelectionKey key) {
-      if (key != null) {
-        Connection c = (Connection)key.attachment();
-        if (c != null) {
-          if (LOG.isTraceEnabled())
-            LOG.trace(getName() + ": disconnecting client " + c.getHostAddress());
-          closeConnection(c);
-        }
-      }
-    }
-
     InetSocketAddress getAddress() {
       return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
     }
@@ -542,9 +457,8 @@ public abstract class HBaseServer {
         channel.configureBlocking(false);
         channel.socket().setTcpNoDelay(tcpNoDelay);
         channel.socket().setKeepAlive(tcpKeepAlive);
-        SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
         c = new Connection(channel, System.currentTimeMillis());
-        readKey.attach(c);
+        getReader().addConnection(c);
         synchronized (connectionList) {
           connectionList.add(numConnections, c);
           numConnections++;
@@ -556,7 +470,129 @@ public abstract class HBaseServer {
       }
     }
 
-    void doRead(SelectionKey key) throws InterruptedException {
+    private synchronized Reader getReader() {
+      if (readerRRIndex == readerCount) {
+        readerRRIndex = 0;
+      }
+
+      return readers[readerRRIndex++];
+    }
+
+    @Override
+    public synchronized void start(){
+      for (int i = 0; i < readerCount; i++) {
+        readers[i].start();
+      }
+
+      super.start();
+    }
+
+    synchronized void doStop() {
+      if (selector != null) {
+        selector.wakeup();
+        Thread.yield();
+      }
+      if (acceptChannel != null) {
+        try {
+          acceptChannel.socket().close();
+        } catch (IOException e) {
+          LOG.warn(getName() + ":Exception in closing listener socket. " + e);
+        }
+      }
+
+      if (readers != null) {
+        for (Reader reader : readers) {
+          if (reader != null) {
+            try{
+              reader.stop();
+            } catch (IOException e) {
+              LOG.warn("Caught: " + e.toString());
+            }
+          }
+        }
+      }
+    }
+  }
+
+
+  /**
+   * Reader is a thread that reads from connections.
+   * Listener thread hands accepted connections over to Readers.
+   * Every Reader has its own set of connections to read from and
+   * Listener makes sure, that one connection is handled exactly by
+   * one Reader
+   */
+  private class Reader extends HasThread {
+
+    private static final int CONNECTION_QUEUE_SIZE = 10;
+
+    private final Selector selector;
+
+    /*
+     * This is a list of new connections. When Listener signals that Reader
+     * should start reading from new connection, this connection is added to
+     * newConnections list. All connections from this list are periodically
+     * registered with Reader's selector and then this list is cleared.
+     */
+    private final List<Connection> newConnections;
+
+    /*
+     * thread id for logging etc
+     */
+    private final int threadId;
+
+    public Reader(int threadId) throws IOException {
+      this.newConnections = new ArrayList<Connection>(CONNECTION_QUEUE_SIZE);
+      this.selector = Selector.open();
+      this.threadId = threadId;
+
+      this.setDaemon(true);
+      this.setName(String.format("Reader %d", threadId));
+    }
+
+    public void addConnection(Connection connection) {
+      if (running){
+        synchronized (newConnections) {
+          newConnections.add(connection);
+        }
+        selector.wakeup();
+      }
+    }
+
+    private void registerNewConnections() {
+      synchronized (newConnections) {
+        if (newConnections.isEmpty()) {
+          return;
+        }
+
+        for (Connection conn : newConnections) {
+          try {
+            SelectionKey readKey = conn.channel.register(selector, SelectionKey.OP_READ);
+            readKey.attach(conn);
+          } catch (ClosedChannelException e) {
+            // It is ok.
+          }
+        }
+        newConnections.clear();
+      }
+    }
+
+    public void stop() throws IOException{
+      selector.close();
+      interrupt();
+    }
+
+    private void processKey(SelectionKey key) throws InterruptedException{
+      if (key.isValid()) {
+        if (key.isReadable()) {
+          doRead(key);
+        } else {
+          LOG.warn(String.format("Reader %d woken up on nonreadable connection.", threadId));
+        }
+      }
+    }
+
+    private void doRead(SelectionKey key) throws InterruptedException{
       int count = 0;
       Connection c = (Connection)key.attachment();
       if (c == null) {
@@ -565,7 +601,7 @@ public abstract class HBaseServer {
       c.setLastContact(System.currentTimeMillis());
 
       try {
-        count = c.readAndProcess();
+        count = c.readAndQueue();
       } catch (InterruptedException ieo) {
         throw ieo;
       } catch (Exception e) {
@@ -588,16 +624,56 @@ public abstract class HBaseServer {
       }
     }
 
-    synchronized void doStop() {
-      if (selector != null) {
-        selector.wakeup();
-        Thread.yield();
+    @Override
+    public void run() {
+      LOG.info(this.getName() + " started");
+      SelectionKey key = null;
+      while(running) {
+        try{
+          int count = selector.select();
+          if (count > 0) {
+            Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
+            while (keys.hasNext()) {
+              key = keys.next();
+              keys.remove();
+              processKey(key);
+            }
+          }
+
+          registerNewConnections();
+        } catch (OutOfMemoryError e) {
+          if (errorHandler != null) {
+            if (errorHandler.checkOOME(e)) {
+              LOG.info(getName() + ": exiting on OOME");
+              closeCurrentConnection(key);
+              running = false;
+              listener.selector.wakeup();
+              return;
+            }
+          } else {
+            // we can run out of memory if we have too many threads
+            // log the event and sleep for a minute and give
+            // some thread(s) a chance to finish
+            LOG.warn("Out of Memory in server select", e);
+            closeCurrentConnection(key);
+            listener.cleanupConnections(true);
+            try { sleep(60000); } catch (InterruptedException ignored) {}
+          }
+        } catch (Exception e) {
+          closeCurrentConnection(key);
+        }
       }
-      if (acceptChannel != null) {
-        try {
-          acceptChannel.socket().close();
-        } catch (IOException e) {
-          LOG.warn(getName() + ":Exception in closing listener socket. " + e);
+
+      LOG.info(this.getName() + " finished");
+    }
+
+    private void closeCurrentConnection(SelectionKey key) {
+      if (key != null) {
+        Connection c = (Connection)key.attachment();
+        if (c != null) {
+          if (LOG.isTraceEnabled())
+            LOG.trace(getName() + ": disconnecting client " + c.getHostAddress());
+          closeConnection(c);
         }
       }
     }
@@ -862,6 +938,69 @@ public abstract class HBaseServer {
     }
   }
 
+  /**
+   * This class is used to delay parsing of incomming requests. It is a storage for
+   * ByteBuffer and timestamp. To parse raw ByteBuffer to get Call object with timestamp
+   * equal to read time, just call parse method.
+   */
+  private class RawCall {
+    private final ByteBuffer data;
+    private final Connection connection;
+    private final long timestamp; // When RawCall was created;
+
+    public RawCall(Connection connection, ByteBuffer data){
+      this.connection = connection;
+      this.data = data;
+      this.timestamp = System.currentTimeMillis();
+    }
+
+    public Call parse() throws IOException{
+      DataInputStream uncompressedIs =
+          new DataInputStream(new ByteArrayInputStream(data.array()));
+      Compression.Algorithm txCompression = Algorithm.NONE;
+      Compression.Algorithm rxCompression = Algorithm.NONE;
+      DataInputStream dis = uncompressedIs;
+
+      // 1. read the call id uncompressed
+      int id = uncompressedIs.readInt();
+        if (LOG.isTraceEnabled())
+          LOG.trace(" got #" + id);
+
+        HBaseRPCOptions options = new HBaseRPCOptions ();
+        Decompressor decompressor = null;
+        if (connection.version >= VERSION_RPCOPTIONS) {
+          // 2. read rpc options uncompressed
+          options.readFields(dis);
+          txCompression = options.getTxCompression();   // server receives this
+          rxCompression = options.getRxCompression();   // server responds with
+          // 3. set up a decompressor to read the rest of the request
+          if (txCompression != Compression.Algorithm.NONE) {
+            decompressor = txCompression.getDecompressor();
+            InputStream is = txCompression.createDecompressionStream(
+                uncompressedIs, decompressor, 0);
+            dis = new DataInputStream(is);
+          }
+        }
+        // 4. read the rest of the params
+        Writable param = ReflectionUtils.newInstance(paramClass, conf);
+        param.readFields(dis);
+
+        Call call = new Call(id, param, connection, timestamp);
+        call.shouldProfile = options.getRequestProfiling ();
+
+        call.setRPCCompression(rxCompression);
+        call.setVersion(connection.version);
+        call.setTag(options.getTag());
+
+        if (decompressor != null) {
+          txCompression.returnDecompressor(decompressor);
+        }
+
+        return call;
+    }
+  }
+
+
   /** Reads calls from a connection and queues them for handling. */
   private class Connection {
     private boolean versionRead = false; //if initial signature and
@@ -949,7 +1088,7 @@ public abstract class HBaseServer {
       return isIdle() && currentTime - lastContact > maxIdleTime;
     }
 
-    public int readAndProcess() throws IOException, InterruptedException {
+    public int readAndQueue() throws IOException, InterruptedException {
       while (true) {
         /* Read at most one RPC. If the header is not read completely yet
          * then iterate until we read first RPC or until there is no data left.
@@ -1005,7 +1144,7 @@ public abstract class HBaseServer {
           dataLengthBuffer.clear();
           data.flip();
           if (headerRead) {
-            processData();
+            queueRawCall();
             data = null;
             return count;
           }
@@ -1018,6 +1157,10 @@ public abstract class HBaseServer {
       }
     }
 
+    private void queueRawCall() throws InterruptedException {
+      callQueue.put(new RawCall(this, data));              // queue the call; maybe blocked here
+    }
+
     /// Reads the header following version
     private void processHeader() throws IOException {
       /* In the current version, it is just a ticket.
@@ -1028,50 +1171,6 @@ public abstract class HBaseServer {
       ticket = (UserGroupInformation) ObjectWritable.readObject(in, conf);
     }
 
-    private void processData() throws  IOException, InterruptedException {
-      DataInputStream uncompressedIs =
-        new DataInputStream(new ByteArrayInputStream(data.array()));
-      Compression.Algorithm txCompression = Algorithm.NONE;
-      Compression.Algorithm rxCompression = Algorithm.NONE;
-      DataInputStream dis = uncompressedIs;
-
-      // 1. read the call id uncompressed
-      int id = uncompressedIs.readInt();
-      if (LOG.isTraceEnabled())
-        LOG.trace(" got #" + id);
-      
-      HBaseRPCOptions options = new HBaseRPCOptions ();
-      Decompressor decompressor = null;
-      if (version >= VERSION_RPCOPTIONS) {
-        // 2. read rpc options uncompressed
-        options.readFields(dis);
-        txCompression = options.getTxCompression();   // server receives this
-        rxCompression = options.getRxCompression();   // server responds with
-        // 3. set up a decompressor to read the rest of the request
-        if (txCompression != Compression.Algorithm.NONE) {
-          decompressor = txCompression.getDecompressor();
-          InputStream is = txCompression.createDecompressionStream(
-              uncompressedIs, decompressor, 0);
-          dis = new DataInputStream(is);
-        }
-      }
-      // 4. read the rest of the params
-      Writable param = ReflectionUtils.newInstance(paramClass, conf);
-      param.readFields(dis);
-
-      Call call = new Call(id, param, this);
-      call.shouldProfile = options.getRequestProfiling ();
-      
-      call.setRPCCompression(rxCompression);
-      call.setVersion(version);
-      call.setTag(options.getTag());
-      callQueue.put(call);              // queue the call; maybe blocked here
-      
-      if (decompressor != null) {
-        txCompression.returnDecompressor(decompressor);
-      }
-    }
-
     protected synchronized void close() {
       closed = true;
       data = null;
@@ -1107,7 +1206,7 @@ public abstract class HBaseServer {
       while (running) {
         try {
           status.pause("Waiting for a call");
-          Call call = callQueue.take(); // pop the queue; maybe blocked here
+          Call call = callQueue.take().parse(); // pop the queue; maybe blocked here
           status.setStatus("Setting up call");
           status.setConnection(call.connection.getHostAddress(),
               call.connection.getRemotePort());
@@ -1261,7 +1360,7 @@ public abstract class HBaseServer {
     this.handlerCount = handlerCount;
     this.socketSendBufferSize = 0;
     this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
-    this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize);
+    this.callQueue  = new LinkedBlockingQueue<RawCall>(maxQueueSize);
     this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);