You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/12/10 02:55:33 UTC
svn commit: r1212711 - in
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase:
thrift/HBaseThreadPoolServer.java thrift/ThriftServer.java util/Threads.java
Author: nspiegelberg
Date: Sat Dec 10 01:55:32 2011
New Revision: 1212711
URL: http://svn.apache.org/viewvc?rev=1212711&view=rev
Log:
HBASE-4863 Fix thread leaks in the HBase thread pool server
Summary: This is part of a fix to solve Thrift server problems observed on Eris
and Hashout clusters. A recent jstack captured by @rthiessen on Eris at the time
of a Thrift server crash had over 15,000 Thrift worker threads. We need to bound
that number with a configuration knob and set it reasonably.
Test Plan: Run Thrift server on a 5-node cluster, open a lot of connections,
and monitor the number of worker threads using jstack.
Reviewers: kannan, kranganathan, rthiessen, nspiegelberg
Reviewed By: kannan
CC: hbase-eng@lists, alasla, davejwatson, nspiegelberg, kannan, kranganathan,
mbautin, cgthayer, pkhemani, gqchen
Differential Revision: 364343
Revert Plan: OK
Task ID: 762593
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java?rev=1212711&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HBaseThreadPoolServer.java Sat Dec 10 01:55:32 2011
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.thrift.TException;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TProcessorFactory;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A thread pool server customized for HBase.
+ */
+public class HBaseThreadPoolServer extends TServer {
+
+ private static final String QUEUE_FULL_MSG =
+ "Queue is full, closing connection";
+
+ /**
+ * This default core pool size should be enough for many test scenarios. We
+ * want to override this with a much larger number (e.g. at least 200) for a
+ * large-scale production setup.
+ */
+ public static final int DEFAULT_MIN_WORKER_THREADS = 16;
+
+ public static final int DEFAULT_MAX_WORKER_THREADS = 1000;
+
+ public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
+
+ public static final String MIN_WORKER_THREADS_CONF_KEY =
+ "hbase.thrift.minWorkerThreads";
+
+ public static final String MAX_WORKER_THREADS_CONF_KEY =
+ "hbase.thrift.maxWorkerThreads";
+
+ public static final String MAX_QUEUED_REQUESTS_CONF_KEY =
+ "hbase.thrift.maxQueuedRequests";
+
+ private static final Log LOG = LogFactory.getLog(
+ HBaseThreadPoolServer.class.getName());
+
+ /**
+ * Time to wait after interrupting all worker threads. This is after a clean
+ * shutdown has been attempted.
+ */
+ public static final int SHUTDOWN_NOW_TIME_MS = 5000;
+
+ public static class Options extends TThreadPoolServer.Options {
+ public int maxQueuedRequests;
+
+ public Options(Configuration conf) {
+ super();
+ minWorkerThreads = conf.getInt(MIN_WORKER_THREADS_CONF_KEY,
+ DEFAULT_MIN_WORKER_THREADS);
+ maxWorkerThreads = conf.getInt(MAX_WORKER_THREADS_CONF_KEY,
+ DEFAULT_MAX_WORKER_THREADS);
+ maxQueuedRequests = conf.getInt(MAX_QUEUED_REQUESTS_CONF_KEY,
+ DEFAULT_MAX_QUEUED_REQUESTS);
+ }
+ }
+
+ /** Executor service for handling client connections */
+ private ExecutorService executorService;
+
+ /** Flag for stopping the server */
+ private volatile boolean stopped;
+
+ private Options serverOptions;
+
+ private final int KEEP_ALIVE_TIME_SEC = 60;
+
+ public HBaseThreadPoolServer(TProcessor processor,
+ TServerTransport serverTransport,
+ TTransportFactory transportFactory,
+ TProtocolFactory protocolFactory,
+ Options options) {
+ super(new TProcessorFactory(processor), serverTransport, transportFactory,
+ transportFactory, protocolFactory, protocolFactory);
+
+ BlockingQueue<Runnable> executorQueue;
+ if (options.maxQueuedRequests > 0) {
+ executorQueue = new LinkedBlockingQueue<Runnable>(
+ options.maxQueuedRequests);
+ } else {
+ executorQueue = new SynchronousQueue<Runnable>();
+ }
+
+ ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+ tfb.setDaemon(true);
+ tfb.setNameFormat("thrift-worker-%d");
+ executorService =
+ new ThreadPoolExecutor(options.minWorkerThreads,
+ options.maxWorkerThreads, KEEP_ALIVE_TIME_SEC, TimeUnit.SECONDS,
+ executorQueue, tfb.build());
+ serverOptions = options;
+ }
+
+ public void serve() {
+ try {
+ serverTransport_.listen();
+ } catch (TTransportException ttx) {
+ LOG.error("Error occurred during listening.", ttx);
+ return;
+ }
+
+ Runtime.getRuntime().addShutdownHook(new Thread(getClass().getSimpleName() + "-shutdown-hook") {
+ @Override
+ public void run() {
+ HBaseThreadPoolServer.this.stop();
+ }
+ });
+
+ stopped = false;
+ while (!stopped && !Thread.interrupted()) {
+ TTransport client = null;
+ try {
+ client = serverTransport_.accept();
+ } catch (TTransportException ttx) {
+ if (!stopped) {
+ LOG.warn("Transport error when accepting message", ttx);
+ continue;
+ } else {
+ // The server has been stopped
+ break;
+ }
+ }
+
+ ClientConnnection command = new ClientConnnection(client);
+ try {
+ executorService.execute(command);
+ } catch (RejectedExecutionException rex) {
+ if (client.getClass() == TSocket.class) {
+ // We expect the client to be TSocket.
+ LOG.warn(QUEUE_FULL_MSG + " from " +
+ ((TSocket) client).getSocket().getRemoteSocketAddress());
+ } else {
+ LOG.warn(QUEUE_FULL_MSG, rex);
+ }
+ client.close();
+ }
+ }
+
+ shutdownServer();
+ }
+
+ /**
+ * Loop until {@link ExecutorService#awaitTermination} finally does return
+ * without an interrupted exception. If we don't do this, then we'll shut
+ * down prematurely. We want to let the executor service clear its task
+ * queue, closing client sockets appropriately.
+ */
+ private void shutdownServer() {
+ executorService.shutdown();
+
+ long msLeftToWait =
+ serverOptions.stopTimeoutUnit.toMillis(serverOptions.stopTimeoutVal);
+ long timeMillis = System.currentTimeMillis();
+
+ LOG.info("Waiting for up to " + msLeftToWait + " ms to finish processing" +
+ " pending requests");
+ boolean interrupted = false;
+ while (msLeftToWait >= 0) {
+ try {
+ executorService.awaitTermination(msLeftToWait, TimeUnit.MILLISECONDS);
+ break;
+ } catch (InterruptedException ix) {
+ long timePassed = System.currentTimeMillis() - timeMillis;
+ msLeftToWait -= timePassed;
+ timeMillis += timePassed;
+ interrupted = true;
+ }
+ }
+
+ LOG.info("Interrupting all worker threads and waiting for "
+ + SHUTDOWN_NOW_TIME_MS + " ms longer");
+
+ // This will interrupt all the threads, even those running a task.
+ executorService.shutdownNow();
+ Threads.sleepWithoutInterrupt(SHUTDOWN_NOW_TIME_MS);
+
+ // Preserve the interrupted status.
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ LOG.info("Thrift server shutdown complete");
+ }
+
+ @Override
+ public void stop() {
+ stopped = true;
+ serverTransport_.interrupt();
+ }
+
+ private class ClientConnnection implements Runnable {
+
+ private TTransport client;
+
+ /**
+ * Default constructor.
+ *
+ * @param client Transport to process
+ */
+ private ClientConnnection(TTransport client) {
+ this.client = client;
+ }
+
+ /**
+ * Loops on processing a client forever
+ */
+ public void run() {
+ TProcessor processor = null;
+ TTransport inputTransport = null;
+ TTransport outputTransport = null;
+ TProtocol inputProtocol = null;
+ TProtocol outputProtocol = null;
+ try {
+ processor = processorFactory_.getProcessor(client);
+ inputTransport = inputTransportFactory_.getTransport(client);
+ outputTransport = outputTransportFactory_.getTransport(client);
+ inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
+ // we check stopped_ first to make sure we're not supposed to be shutting
+ // down. this is necessary for graceful shutdown.
+ while (!stopped && processor.process(inputProtocol, outputProtocol)) {}
+ } catch (TTransportException ttx) {
+ // Assume the client died and continue silently
+ } catch (TException tx) {
+ LOG.error("Thrift error occurred during processing of message.", tx);
+ } catch (Exception x) {
+ LOG.error("Error occurred during processing of message.", x);
+ }
+
+ if (inputTransport != null) {
+ inputTransport.close();
+ }
+
+ if (outputTransport != null) {
+ outputTransport.close();
+ }
+ }
+ }
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java?rev=1212711&r1=1212710&r2=1212711&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java Sat Dec 10 01:55:32 2011
@@ -18,6 +18,17 @@
package org.apache.hadoop.hbase.thrift;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
@@ -35,7 +46,6 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -46,9 +56,9 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
-import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
@@ -69,7 +79,6 @@ import org.apache.thrift.protocol.TProto
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
@@ -77,23 +86,15 @@ import org.apache.thrift.transport.TServ
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportFactory;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
/**
* ThriftServer - this class starts up a Thrift server which implements the
* Hbase API specified in the Hbase.thrift IDL file.
*/
public class ThriftServer {
+ private static final Class<? extends TServer>
+ THREAD_POOL_SERVER_CLASS = HBaseThreadPoolServer.class;
+
/**
* The HBaseHandler is a glue object that connects Thrift RPC calls to the
* HBase client API primarily defined in the HBaseAdmin and HTable objects.
@@ -1063,16 +1064,25 @@ public class ThriftServer {
options.addOption("f", "framed", false, "Use framed transport");
options.addOption("c", "compact", false, "Use the compact protocol");
options.addOption("h", "help", false, "Print help information");
+ options.addOption("m", "minWorkers", true, "The minimum number of worker " +
+ "threads for " + THREAD_POOL_SERVER_CLASS.getSimpleName());
+ options.addOption("w", "workers", true, "The maximum number of worker " +
+ "threads for " + THREAD_POOL_SERVER_CLASS.getSimpleName());
+ options.addOption("q", "queue", true, "The maximum number of queued " +
+ "requests in " + THREAD_POOL_SERVER_CLASS.getSimpleName());
OptionGroup servers = new OptionGroup();
servers.addOption(new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
- servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
+ servers.addOption(new Option("threadpool", false, "Use "
+ + THREAD_POOL_SERVER_CLASS.getSimpleName() + ". This is the default."));
options.addOptionGroup(servers);
CommandLineParser parser = new PosixParser();
CommandLine cmd = parser.parse(options, args);
+ Configuration conf = HBaseConfiguration.create();
+
/**
* This is so complicated to please both bin/hbase and bin/hbase-daemon.
* hbase-daemon provides "start" and "stop" arguments
@@ -1094,6 +1104,26 @@ public class ThriftServer {
printUsageAndExit(options, -1);
}
+ // Make optional changes to the configuration based on command-line options
+ if (cmd.hasOption("minWorkers")) {
+ conf.set(HBaseThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY,
+ cmd.getOptionValue("minWorkers"));
+ }
+
+ if (cmd.hasOption("workers")) {
+ conf.set(HBaseThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
+ cmd.getOptionValue("workers"));
+ }
+
+ if (cmd.hasOption("queue")) {
+ conf.set(HBaseThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
+ cmd.getOptionValue("queue"));
+ }
+
+ // Only instantiate this when finished modifying the configuration
+ HBaseThreadPoolServer.Options serverOptions =
+ new HBaseThreadPoolServer.Options(conf);
+
// Construct correct ProtocolFactory
TProtocolFactory protocolFactory;
if (cmd.hasOption("compact")) {
@@ -1104,8 +1134,7 @@ public class ThriftServer {
protocolFactory = new TBinaryProtocol.Factory();
}
- HBaseHandler handler = new HBaseHandler(
- HBaseConfiguration.create());
+ HBaseHandler handler = new HBaseHandler(conf);
Hbase.Processor processor = new Hbase.Processor(handler);
TServer server;
@@ -1150,8 +1179,23 @@ public class ThriftServer {
transportFactory = new TTransportFactory();
}
- LOG.info("starting HBase ThreadPool Thrift server on " + listenAddress + ":" + Integer.toString(listenPort));
- server = new TThreadPoolServer(processor, serverTransport, transportFactory, protocolFactory);
+ LOG.info("starting " + THREAD_POOL_SERVER_CLASS.getSimpleName() + " on "
+ + listenAddress + ":" + Integer.toString(listenPort)
+ + "; minimum number of worker threads="
+ + serverOptions.minWorkerThreads
+ + ", maximum number of worker threads="
+ + serverOptions.maxWorkerThreads + ", queued requests="
+ + serverOptions.maxQueuedRequests);
+
+ server = new HBaseThreadPoolServer(processor, serverTransport,
+ transportFactory, protocolFactory, serverOptions);
+
+ if (server.getClass() != THREAD_POOL_SERVER_CLASS) {
+ // A sanity check that we instantiated the right thing.
+ throw new RuntimeException("Expected thread pool server class " +
+ THREAD_POOL_SERVER_CLASS.getName() + " but got " +
+ server.getClass().getName());
+ }
}
server.serve();
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1212711&r1=1212710&r2=1212711&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java Sat Dec 10 01:55:32 2011
@@ -25,6 +25,7 @@ import java.io.PrintWriter;
import org.apache.hadoop.util.ReflectionUtils;
import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.TimeUnit;
/**
* Thread Utility
@@ -119,4 +120,28 @@ public class Threads {
e.printStackTrace();
}
}
+
+ /**
+ * Sleeps for the given amount of time even if interrupted. Preserves
+ * the interrupt status.
+ * @param msToWait the amount of time to sleep in milliseconds
+ */
+ public static void sleepWithoutInterrupt(long msToWait) {
+ long timeMillis = System.currentTimeMillis();
+ boolean interrupted = false;
+ while (msToWait > 0) {
+ try {
+ Thread.sleep(msToWait);
+ } catch (InterruptedException ex) {
+ long timePassed = System.currentTimeMillis() - timeMillis;
+ msToWait -= timePassed;
+ timeMillis += timePassed;
+ interrupted = true;
+ }
+ }
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
}