You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/12/10 02:55:33 UTC

svn commit: r1212711 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase: thrift/HBaseThreadPoolServer.java thrift/ThriftServer.java util/Threads.java

Author: nspiegelberg
Date: Sat Dec 10 01:55:32 2011
New Revision: 1212711

URL: http://svn.apache.org/viewvc?rev=1212711&view=rev
Log:
HBASE-4863 Fix thread leaks in the HBase thread pool server

Summary: This is part of a fix to solve Thrift server problems observed on Eris
and Hashout clusters. A recent jstack captured by @rthiessen on Eris at the time
of a Thrift server crash had over 15,000 Thrift worker threads. We need to bound
that number with a configuration knob and set it reasonably.

Test Plan: Run Thrift server on a 5-node cluster, open a lot of connections,
and monitor the number of worker threads using jstack.

Reviewers: kannan, kranganathan, rthiessen, nspiegelberg

Reviewed By: kannan

CC: hbase-eng@lists, alasla, davejwatson, nspiegelberg, kannan, kranganathan,
mbautin, cgthayer, pkhemani, gqchen

Differential Revision: 364343

Revert Plan: OK

Task ID: 762593

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java?rev=1212711&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java Sat Dec 10 01:55:32 2011
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A thread pool server customized for HBase.
+ */
+public class HBaseThreadPoolServer extends TServer {
+
+  private static final String QUEUE_FULL_MSG =
+      "Queue is full, closing connection";
+
+  /**
+   * This default core pool size should be enough for many test scenarios. We
+   * want to override this with a much larger number (e.g. at least 200) for a
+   * large-scale production setup.
+   */
+  public static final int DEFAULT_MIN_WORKER_THREADS = 16;
+
+  public static final int DEFAULT_MAX_WORKER_THREADS = 1000;
+
+  public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
+
+  public static final String MIN_WORKER_THREADS_CONF_KEY =
+      "hbase.thrift.minWorkerThreads";
+
+  public static final String MAX_WORKER_THREADS_CONF_KEY =
+      "hbase.thrift.maxWorkerThreads";
+
+  public static final String MAX_QUEUED_REQUESTS_CONF_KEY =
+      "hbase.thrift.maxQueuedRequests";
+
+  private static final Log LOG = LogFactory.getLog(
+      HBaseThreadPoolServer.class.getName());
+
+  /**
+   * Time to wait after interrupting all worker threads. This is after a clean
+   * shutdown has been attempted.
+   */
+  public static final int SHUTDOWN_NOW_TIME_MS = 5000;
+
+  public static class Options extends TThreadPoolServer.Options {
+    public int maxQueuedRequests;
+
+    public Options(Configuration conf) {
+      super();
+      minWorkerThreads = conf.getInt(MIN_WORKER_THREADS_CONF_KEY,
+          DEFAULT_MIN_WORKER_THREADS);
+      maxWorkerThreads = conf.getInt(MAX_WORKER_THREADS_CONF_KEY,
+          DEFAULT_MAX_WORKER_THREADS);
+      maxQueuedRequests = conf.getInt(MAX_QUEUED_REQUESTS_CONF_KEY,
+          DEFAULT_MAX_QUEUED_REQUESTS);
+    }
+  }
+
+  /** Executor service for handling client connections */
+  private ExecutorService executorService;
+
+  /** Flag for stopping the server */
+  private volatile boolean stopped;
+
+  private Options serverOptions;
+
+  private final int KEEP_ALIVE_TIME_SEC = 60;
+
+  public HBaseThreadPoolServer(TProcessor processor,
+      TServerTransport serverTransport,
+      TTransportFactory transportFactory,
+      TProtocolFactory protocolFactory,
+      Options options) {
+    super(new TProcessorFactory(processor), serverTransport, transportFactory,
+        transportFactory, protocolFactory, protocolFactory);
+
+    BlockingQueue<Runnable> executorQueue;
+    if (options.maxQueuedRequests > 0) {
+      executorQueue = new LinkedBlockingQueue<Runnable>(
+          options.maxQueuedRequests);
+    } else {
+      executorQueue = new SynchronousQueue<Runnable>();
+    }
+
+    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+    tfb.setDaemon(true);
+    tfb.setNameFormat("thrift-worker-%d");
+    executorService =
+        new ThreadPoolExecutor(options.minWorkerThreads,
+            options.maxWorkerThreads, KEEP_ALIVE_TIME_SEC, TimeUnit.SECONDS,
+            executorQueue, tfb.build());
+    serverOptions = options;
+  }
+
+  public void serve() {
+    try {
+      serverTransport_.listen();
+    } catch (TTransportException ttx) {
+      LOG.error("Error occurred during listening.", ttx);
+      return;
+    }
+
+    Runtime.getRuntime().addShutdownHook(new Thread(getClass().getSimpleName() + "-shutdown-hook") {
+      @Override
+      public void run() {
+        HBaseThreadPoolServer.this.stop();
+      }
+    });
+
+    stopped = false;
+    while (!stopped && !Thread.interrupted()) {
+      TTransport client = null;
+      try {
+        client = serverTransport_.accept();
+      } catch (TTransportException ttx) {
+        if (!stopped) {
+          LOG.warn("Transport error when accepting message", ttx);
+          continue;
+        } else {
+          // The server has been stopped
+          break;
+        }
+      }
+
+      ClientConnnection command = new ClientConnnection(client);
+      try {
+        executorService.execute(command);
+      } catch (RejectedExecutionException rex) {
+        if (client.getClass() == TSocket.class) {
+          // We expect the client to be TSocket.
+          LOG.warn(QUEUE_FULL_MSG + " from " +
+              ((TSocket) client).getSocket().getRemoteSocketAddress());
+        } else {
+          LOG.warn(QUEUE_FULL_MSG, rex);
+        }
+        client.close();
+      }
+    }
+
+    shutdownServer();
+  }
+
+  /**
+   * Loop until {@link ExecutorService#awaitTermination} finally does return
+   * without an interrupted exception. If we don't do this, then we'll shut
+   * down prematurely. We want to let the executor service clear its task
+   * queue, closing client sockets appropriately.
+   */
+  private void shutdownServer() {
+    executorService.shutdown();
+
+    long msLeftToWait =
+        serverOptions.stopTimeoutUnit.toMillis(serverOptions.stopTimeoutVal);
+    long timeMillis = System.currentTimeMillis();
+
+    LOG.info("Waiting for up to " + msLeftToWait + " ms to finish processing" +
+        " pending requests");
+    boolean interrupted = false;
+    while (msLeftToWait >= 0) {
+      try {
+        executorService.awaitTermination(msLeftToWait, TimeUnit.MILLISECONDS);
+        break;
+      } catch (InterruptedException ix) {
+        long timePassed = System.currentTimeMillis() - timeMillis;
+        msLeftToWait -= timePassed;
+        timeMillis += timePassed;
+        interrupted = true;
+      }
+    }
+
+    LOG.info("Interrupting all worker threads and waiting for "
+        + SHUTDOWN_NOW_TIME_MS + " ms longer");
+
+    // This will interrupt all the threads, even those running a task.
+    executorService.shutdownNow();
+    Threads.sleepWithoutInterrupt(SHUTDOWN_NOW_TIME_MS);
+
+    // Preserve the interrupted status.
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
+    LOG.info("Thrift server shutdown complete");
+  }
+
+  @Override
+  public void stop() {
+    stopped = true;
+    serverTransport_.interrupt();
+  }
+
+  private class ClientConnnection implements Runnable {
+
+    private TTransport client;
+
+    /**
+     * Default constructor.
+     *
+     * @param client Transport to process
+     */
+    private ClientConnnection(TTransport client) {
+      this.client = client;
+    }
+
+    /**
+     * Loops on processing a client forever
+     */
+    public void run() {
+      TProcessor processor = null;
+      TTransport inputTransport = null;
+      TTransport outputTransport = null;
+      TProtocol inputProtocol = null;
+      TProtocol outputProtocol = null;
+      try {
+        processor = processorFactory_.getProcessor(client);
+        inputTransport = inputTransportFactory_.getTransport(client);
+        outputTransport = outputTransportFactory_.getTransport(client);
+        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+        // we check stopped_ first to make sure we're not supposed to be shutting
+        // down. this is necessary for graceful shutdown.
+        while (!stopped && processor.process(inputProtocol, outputProtocol)) {}
+      } catch (TTransportException ttx) {
+        // Assume the client died and continue silently
+      } catch (TException tx) {
+        LOG.error("Thrift error occurred during processing of message.", tx);
+      } catch (Exception x) {
+        LOG.error("Error occurred during processing of message.", x);
+      }
+
+      if (inputTransport != null) {
+        inputTransport.close();
+      }
+
+      if (outputTransport != null) {
+        outputTransport.close();
+      }
+    }
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java?rev=1212711&r1=1212710&r2=1212711&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java Sat Dec 10 01:55:32 2011
@@ -18,6 +18,17 @@
 
 package org.apache.hadoop.hbase.thrift;
 
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -35,7 +46,6 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -46,9 +56,9 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
 import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.ParseFilter;
 import org.apache.hadoop.hbase.filter.PrefixFilter;
 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
-import org.apache.hadoop.hbase.filter.ParseFilter;
 import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
 import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
 import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
@@ -69,7 +79,6 @@ import org.apache.thrift.protocol.TProto
 import org.apache.thrift.server.THsHaServer;
 import org.apache.thrift.server.TNonblockingServer;
 import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TNonblockingServerSocket;
 import org.apache.thrift.transport.TNonblockingServerTransport;
@@ -77,23 +86,15 @@ import org.apache.thrift.transport.TServ
 import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TTransportFactory;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
 /**
  * ThriftServer - this class starts up a Thrift server which implements the
  * Hbase API specified in the Hbase.thrift IDL file.
  */
 public class ThriftServer {
 
+  private static final Class<? extends TServer>
+      THREAD_POOL_SERVER_CLASS = HBaseThreadPoolServer.class;
+
   /**
    * The HBaseHandler is a glue object that connects Thrift RPC calls to the
    * HBase client API primarily defined in the HBaseAdmin and HTable objects.
@@ -1063,16 +1064,25 @@ public class ThriftServer {
     options.addOption("f", "framed", false, "Use framed transport");
     options.addOption("c", "compact", false, "Use the compact protocol");
     options.addOption("h", "help", false, "Print help information");
+    options.addOption("m", "minWorkers", true, "The minimum number of worker " +
+        "threads for " + THREAD_POOL_SERVER_CLASS.getSimpleName());
+    options.addOption("w", "workers", true, "The maximum number of worker " +
+        "threads for " + THREAD_POOL_SERVER_CLASS.getSimpleName());
+    options.addOption("q", "queue", true, "The maximum number of queued " +
+        "requests in " + THREAD_POOL_SERVER_CLASS.getSimpleName());
 
     OptionGroup servers = new OptionGroup();
     servers.addOption(new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
     servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
-    servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
+    servers.addOption(new Option("threadpool", false, "Use "
+        + THREAD_POOL_SERVER_CLASS.getSimpleName() + ". This is the default."));
     options.addOptionGroup(servers);
 
     CommandLineParser parser = new PosixParser();
     CommandLine cmd = parser.parse(options, args);
 
+    Configuration conf = HBaseConfiguration.create();
+
     /**
      * This is so complicated to please both bin/hbase and bin/hbase-daemon.
      * hbase-daemon provides "start" and "stop" arguments
@@ -1094,6 +1104,26 @@ public class ThriftServer {
       printUsageAndExit(options, -1);
     }
 
+    // Make optional changes to the configuration based on command-line options
+    if (cmd.hasOption("minWorkers")) {
+      conf.set(HBaseThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY,
+          cmd.getOptionValue("minWorkers"));
+    }
+
+    if (cmd.hasOption("workers")) {
+      conf.set(HBaseThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
+          cmd.getOptionValue("workers"));
+    }
+
+    if (cmd.hasOption("queue")) {
+      conf.set(HBaseThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
+          cmd.getOptionValue("queue"));
+    }
+
+    // Only instantiate this when finished modifying the configuration
+    HBaseThreadPoolServer.Options serverOptions =
+      new HBaseThreadPoolServer.Options(conf);
+
     // Construct correct ProtocolFactory
     TProtocolFactory protocolFactory;
     if (cmd.hasOption("compact")) {
@@ -1104,8 +1134,7 @@ public class ThriftServer {
       protocolFactory = new TBinaryProtocol.Factory();
     }
 
-    HBaseHandler handler = new HBaseHandler(
-        HBaseConfiguration.create());
+    HBaseHandler handler = new HBaseHandler(conf);
     Hbase.Processor processor = new Hbase.Processor(handler);
 
     TServer server;
@@ -1150,8 +1179,23 @@ public class ThriftServer {
         transportFactory = new TTransportFactory();
       }
 
-      LOG.info("starting HBase ThreadPool Thrift server on " + listenAddress + ":" + Integer.toString(listenPort));
-      server = new TThreadPoolServer(processor, serverTransport, transportFactory, protocolFactory);
+      LOG.info("starting " + THREAD_POOL_SERVER_CLASS.getSimpleName() + " on "
+          + listenAddress + ":" + Integer.toString(listenPort)
+          + "; minimum number of worker threads="
+          + serverOptions.minWorkerThreads
+          + ", maximum number of worker threads="
+          + serverOptions.maxWorkerThreads + ", queued requests="
+          + serverOptions.maxQueuedRequests);
+
+      server = new HBaseThreadPoolServer(processor, serverTransport,
+          transportFactory, protocolFactory, serverOptions);
+
+      if (server.getClass() != THREAD_POOL_SERVER_CLASS) {
+        // A sanity check that we instantiated the right thing.
+        throw new RuntimeException("Expected thread pool server class " +
+            THREAD_POOL_SERVER_CLASS.getName() + " but got " +
+            server.getClass().getName());
+      }
     }
 
     server.serve();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1212711&r1=1212710&r2=1212711&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java Sat Dec 10 01:55:32 2011
@@ -25,6 +25,7 @@ import java.io.PrintWriter;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Thread Utility
@@ -119,4 +120,28 @@ public class Threads {
       e.printStackTrace();
     }
   }
+
+  /**
+   * Sleeps for the given amount of time even if interrupted. Preserves
+   * the interrupt status.
+   * @param msToWait the amount of time to sleep in milliseconds
+   */
+  public static void sleepWithoutInterrupt(long msToWait) {
+    long timeMillis = System.currentTimeMillis();
+    boolean interrupted = false;
+    while (msToWait > 0) {
+      try {
+        Thread.sleep(msToWait);
+      } catch (InterruptedException ex) {
+        long timePassed = System.currentTimeMillis() - timeMillis;
+        msToWait -= timePassed;
+        timeMillis += timePassed;
+        interrupted = true;
+      }
+    }
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
 }