You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/04/21 03:37:16 UTC
svn commit: r1328562 [1/9] - in /hbase/branches/0.89-fb: ./
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/filter/
src/main/java/org/apache/hadoop/hbase/regionserver/
src/main/java/org/apache/hadoop/hbase/thrift/ src/main/...
Author: mbautin
Date: Sat Apr 21 01:37:15 2012
New Revision: 1328562
URL: http://svn.apache.org/viewvc?rev=1328562&view=rev
Log:
[jira] [HBASE-5803] [89-fb] Upgrade hbase 0.89-fb to Thrift 0.8.0 and bring Thrift server enhancements from trunk
Summary: TBoundedThreadPoolServer has been a problem for us when there is a
large number of clients. We need to migrate to 0.8.0. in 89-fb and bring the
relevant improvements from trunk, including supporting TThreadedSelectorServer.
Test Plan:
- Run unit tests
- Deploy to a test cluster, turn on the load, and monitor
correctness/performance
Reviewers: schen, nspiegelberg, kannan, liyintang
Reviewed By: schen
Differential Revision: https://reviews.facebook.net/D2811
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
- copied, changed from r1328561, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestHsHaServerCmdLine.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestNonblockingServerCmdLine.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThreadPoolServerFramedCmdLine.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThreadPoolServerUnframedCmdLine.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThreadedSelectorServerCmdLine.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerLegacy.java
- copied, changed from r1328561, hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/ThriftServerCmdLineTestBase.java
Modified:
hbase/branches/0.89-fb/pom.xml
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseFilter.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/AlreadyExists.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/BatchMutation.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/ColumnDescriptor.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/Hbase.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/IOError.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/IllegalArgument.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/Mutation.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/TCell.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/TRegionInfo.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/TRowResult.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/TScan.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
Modified: hbase/branches/0.89-fb/pom.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/pom.xml?rev=1328562&r1=1328561&r2=1328562&view=diff
==============================================================================
--- hbase/branches/0.89-fb/pom.xml (original)
+++ hbase/branches/0.89-fb/pom.xml Sat Apr 21 01:37:15 2012
@@ -491,7 +491,7 @@
<protobuf.version>2.3.0</protobuf.version>
<slf4j.version>1.5.8</slf4j.version>
<stax-api>1.0.1</stax-api>
- <thrift.version>0.2.0</thrift.version>
+ <thrift.version>0.8.0</thrift.version>
<guava.version>r09</guava.version>
</properties>
@@ -627,7 +627,7 @@
-->
<dependency>
<groupId>org.apache.thrift</groupId>
- <artifactId>thrift</artifactId>
+ <artifactId>libthrift</artifactId>
<version>${thrift.version}</version>
<exclusions>
<exclusion>
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1328562&r1=1328561&r2=1328562&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Sat Apr 21 01:37:15 2012
@@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase;
+import java.nio.ByteBuffer;
+
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.util.Bytes;
@@ -302,16 +304,22 @@ public final class HConstants {
*/
public static final byte [] EMPTY_BYTE_ARRAY = new byte [0];
+ public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(EMPTY_BYTE_ARRAY);
+
/**
* Used by scanners, etc when they want to start at the beginning of a region
*/
public static final byte [] EMPTY_START_ROW = EMPTY_BYTE_ARRAY;
+ public static final ByteBuffer EMPTY_START_ROW_BUF = ByteBuffer.wrap(EMPTY_START_ROW);
+
/**
* Last row in a table.
*/
public static final byte [] EMPTY_END_ROW = EMPTY_START_ROW;
+ public static final ByteBuffer EMPTY_END_ROW_BUF = ByteBuffer.wrap(EMPTY_END_ROW);
+
/**
* Used by scanners and others when they're trying to detect the end of a
* table
@@ -491,6 +499,35 @@ public final class HConstants {
public static final String LOAD_BALANCER_SLOP_KEY = "hbase.regions.slop";
+ // Thrift server configuration options
+
+ /** Configuration key prefix for the stand-alone thrift proxy */
+ public static final String THRIFT_PROXY_PREFIX = "hbase.thrift.";
+
+ /** Configuration key prefix for thrift server embedded into the region server */
+ public static final String RS_THRIFT_PREFIX = "hbase.regionserver.thrift.";
+
+ /** Default port for the stand-alone thrift proxy */
+ public static final int DEFAULT_THRIFT_PROXY_PORT = 9090;
+
+ /** Default port for the thrift server embedded into regionserver */
+ public static final int DEFAULT_RS_THRIFT_SERVER_PORT = 9091;
+
+ /** Configuration key suffix for thrift server type (e.g. thread pool, nonblocking, etc.) */
+ public static final String THRIFT_SERVER_TYPE_SUFFIX = "server.type";
+
+ /** Configuration key suffix for the IP address for thrift server to bind to */
+ public static final String THRIFT_BIND_SUFFIX = "ipaddress";
+
+ /** Configuration key suffix for whether to use compact Thrift transport */
+ public static final String THRIFT_COMPACT_SUFFIX = "compact";
+
+ /** Configuration key suffix for whether to use framed Thrift transport */
+ public static final String THRIFT_FRAMED_SUFFIX = "framed";
+
+ /** Configuration key suffix for Thrift server port */
+ public static final String THRIFT_PORT_SUFFIX = "port";
+
private HConstants() {
// Can't be instantiated with this ctor.
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseFilter.java?rev=1328562&r1=1328561&r2=1328562&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseFilter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseFilter.java Sat Apr 21 01:37:15 2012
@@ -19,26 +19,23 @@
*/
package org.apache.hadoop.hbase.filter;
-import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
-import java.util.TreeSet;
import java.util.ArrayList;
-import java.util.Stack;
+import java.util.Collections;
+import java.util.EmptyStackException;
import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
+import java.util.Stack;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.filter.ParseConstants;
-
-import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import java.lang.ArrayIndexOutOfBoundsException;
-import java.lang.ClassCastException;
-import java.lang.reflect.*;
-import java.util.EmptyStackException;
+import org.apache.hadoop.hbase.util.Bytes;
/**
* This class allows a user to specify a filter via a string
@@ -51,6 +48,7 @@ import java.util.EmptyStackException;
*
*/
public class ParseFilter {
+ private static final Log LOG = LogFactory.getLog(ParseFilter.class);
private static HashMap<ByteBuffer, Integer> operatorPrecedenceHashMap;
private static HashMap<String, String> filterHashMap;
@@ -841,4 +839,27 @@ public class ParseFilter {
public Set<String> getSupportedFilters () {
return filterHashMap.keySet();
}
+
+ /**
+ * Returns all known filters
+ * @return an unmodifiable map of filters
+ */
+ public static Map<String, String> getAllFilters() {
+ return Collections.unmodifiableMap(filterHashMap);
+ }
+
+ /**
+ * Register a new filter with the parser. If the filter is already registered,
+ * an IllegalArgumentException will be thrown.
+ *
+ * @param name a name for the filter
+ * @param filterClass fully qualified class name
+ */
+ public static void registerFilter(String name, String filterClass) {
+ if(LOG.isInfoEnabled())
+ LOG.info("Registering new filter " + name);
+
+ filterHashMap.put(name, filterClass);
+ }
+
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java?rev=1328562&r1=1328561&r2=1328562&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java Sat Apr 21 01:37:15 2012
@@ -1,4 +1,6 @@
/**
+ * Copyright 2011 The Apache Software Foundation
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,13 +17,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,62 +30,58 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
+import org.apache.hadoop.hbase.thrift.ThriftServerRunner;
+import org.apache.hadoop.hbase.thrift.ThriftUtilities;
import org.apache.hadoop.hbase.thrift.generated.Hbase;
import org.apache.hadoop.hbase.thrift.generated.IOError;
import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
-import org.apache.hadoop.hbase.thrift.TBoundedThreadPoolServer;
-import org.apache.hadoop.hbase.thrift.ThriftMetrics;
-import org.apache.hadoop.hbase.thrift.ThriftServer;
-import org.apache.hadoop.hbase.thrift.ThriftUtilities;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TTransportFactory;
/**
- * ThriftServer - this class starts up a Thrift server in the same
+ * HRegionThriftServer - this class starts up a Thrift server in the same
* JVM where the RegionServer is running. It inherits most of the
* functionality from the standard ThriftServer. This is good because
* we can maintain compatibility with applications that use the
- * standard Thrift interface. For performance reasons, we override
- * methods to directly invoke calls into the HRegionServer.
+ * standard Thrift interface. For performance reasons, we can override
+ * methods to directly invoke calls into the HRegionServer and avoid the hop.
+ * <p>
+ * This can be enabled with <i>hbase.regionserver.export.thrift</i> set to true.
*/
public class HRegionThriftServer extends Thread {
public static final Log LOG = LogFactory.getLog(HRegionThriftServer.class);
- public static final int DEFAULT_LISTEN_PORT = 9091;
-
- private HRegionServer rs;
- private Configuration conf;
- private int port;
- private boolean nonblocking;
- private String bindIpAddress;
- private String transport;
- private String protocol;
- volatile private TServer tserver;
- private boolean redirect; // redirect to appropriate Regionserver
+ private final HRegionServer rs;
+ private final ThriftServerRunner serverRunner;
/**
* Create an instance of the glue object that connects the
* RegionServer with the standard ThriftServer implementation
*/
- HRegionThriftServer(HRegionServer regionServer, Configuration conf) {
+ HRegionThriftServer(HRegionServer regionServer, Configuration conf)
+ throws IOException {
+ super("Region Thrift Server");
this.rs = regionServer;
- this.conf = conf;
+ this.serverRunner = new ThriftServerRunner(conf, HConstants.RS_THRIFT_PREFIX,
+ new HBaseHandlerRegion(conf));
+ }
+
+ /**
+ * Stop ThriftServer
+ */
+ void shutdown() {
+ serverRunner.shutdown();
+ }
+
+ @Override
+ public void run() {
+ serverRunner.run();
}
/**
@@ -93,7 +89,14 @@ public class HRegionThriftServer extends
* to use the default implementation for most calls. We override certain calls
* for performance reasons
*/
- private class HBaseHandlerRegion extends ThriftServer.HBaseHandler {
+ private class HBaseHandlerRegion extends ThriftServerRunner.HBaseHandler
+ implements Hbase.Iface {
+
+ /**
+ * Whether requests should be redirected to other RegionServers if the
+ * specified region is not hosted by this RegionServer.
+ */
+ private boolean redirect;
HBaseHandlerRegion(final Configuration conf) throws IOException {
super(conf);
@@ -101,6 +104,14 @@ public class HRegionThriftServer extends
}
/**
+ * Read and initialize config parameters
+ */
+ private void initialize(Configuration conf) {
+ this.redirect = conf.getBoolean("hbase.regionserver.thrift.redirect",
+ false);
+ }
+
+ /**
* Do increments. Shortcircuit to get better performance.
*/
@Override
@@ -130,23 +141,24 @@ public class HRegionThriftServer extends
* Get a record. Shortcircuit to get better performance.
*/
@Override
- public List<TRowResult> getRowWithColumnsTs(byte[] tableName, byte[] row,
- List<byte[]> columns,
+ public List<TRowResult> getRowWithColumnsTs(ByteBuffer tableName, ByteBuffer row,
+ List<ByteBuffer> columns,
long timestamp)
throws IOError {
try {
HTable table = getTable(tableName);
- HRegionLocation location = table.getRegionLocation(row);
+ byte[] rowBytes = Bytes.getBytes(row);
+ HRegionLocation location = table.getRegionLocation(rowBytes);
byte[] regionName = location.getRegionInfo().getRegionName();
if (columns == null) {
- Get get = new Get(row);
+ Get get = new Get(rowBytes);
get.setTimeRange(Long.MIN_VALUE, timestamp);
Result result = rs.get(regionName, get);
return ThriftUtilities.rowResultFromHBase(result);
}
byte[][] columnArr = columns.toArray(new byte[columns.size()][]);
- Get get = new Get(row);
+ Get get = new Get(rowBytes);
for (byte[] column : columnArr) {
byte[][] famAndQf = KeyValue.parseColumn(column);
if (famAndQf.length == 1) {
@@ -168,26 +180,28 @@ public class HRegionThriftServer extends
throw new IOError(e.getMessage());
}
}
+
@Override
- public List<TRowResult> getRowWithColumnPrefix(byte[] tableName,
- byte[] row, byte[] prefix) throws IOError {
+ public List<TRowResult> getRowWithColumnPrefix(ByteBuffer tableName,
+ ByteBuffer row, ByteBuffer prefix) throws IOError {
return (getRowWithColumnPrefixTs(tableName, row, prefix,
HConstants.LATEST_TIMESTAMP));
}
@Override
- public List<TRowResult> getRowWithColumnPrefixTs(byte[] tableName,
- byte[] row, byte[] prefix, long timestamp) throws IOError {
+ public List<TRowResult> getRowWithColumnPrefixTs(ByteBuffer tableName,
+ ByteBuffer row, ByteBuffer prefix, long timestamp) throws IOError {
try {
HTable table = getTable(tableName);
+ byte[] rowBytes = Bytes.getBytes(row);
if (prefix == null) {
- Get get = new Get(row);
+ Get get = new Get(rowBytes);
get.setTimeRange(Long.MIN_VALUE, timestamp);
Result result = table.get(get);
return ThriftUtilities.rowResultFromHBase(result);
}
- Get get = new Get(row);
- byte[][] famAndPrefix = KeyValue.parseColumn(prefix);
+ Get get = new Get(rowBytes);
+ byte[][] famAndPrefix = KeyValue.parseColumn(Bytes.getBytes(prefix));
if (famAndPrefix.length == 2) {
get.addFamily(famAndPrefix[0]);
get.setFilter(new ColumnPrefixFilter(famAndPrefix[1]));
@@ -201,92 +215,5 @@ public class HRegionThriftServer extends
throw new IOError(e.getMessage());
}
}
-
- }
-
- /**
- * Read and initialize config parameters
- */
- private void initialize(Configuration conf) {
- this.port = conf.getInt("hbase.regionserver.thrift.port",
- DEFAULT_LISTEN_PORT);
- this.bindIpAddress = conf.get("hbase.regionserver.thrift.ipaddress");
- this.protocol = conf.get("hbase.regionserver.thrift.protocol");
- this.transport = conf.get("hbase.regionserver.thrift.transport");
- this.nonblocking = conf.getBoolean("hbase.regionserver.thrift.nonblocking",
- false);
- this.redirect = conf.getBoolean("hbase.regionserver.thrift.redirect",
- false);
- }
-
- /**
- * Stop ThriftServer
- */
- void shutdown() {
- if (tserver != null) {
- tserver.stop();
- tserver = null;
- }
- }
-
- public void run() {
- try {
- HBaseHandlerRegion handler = new HBaseHandlerRegion(this.conf);
- Hbase.Processor processor = new Hbase.Processor(handler);
-
- TProtocolFactory protocolFactory;
- if (this.protocol != null && this.protocol.equals("compact")) {
- protocolFactory = new TCompactProtocol.Factory();
- } else {
- protocolFactory = new TBinaryProtocol.Factory();
- }
-
- if (this.nonblocking) {
- LOG.info("starting HRegionServer Nonblocking Thrift server on " +
- this.port);
- LOG.info("HRegionServer Nonblocking Thrift server does not " +
- "support address binding.");
- TNonblockingServerTransport serverTransport =
- new TNonblockingServerSocket(this.port);
- TFramedTransport.Factory transportFactory =
- new TFramedTransport.Factory();
-
- tserver = new TNonblockingServer(processor, serverTransport,
- transportFactory, protocolFactory);
- } else {
- InetAddress listenAddress = null;
- if (this.bindIpAddress != null) {
- listenAddress = InetAddress.getByName(this.bindIpAddress);
- } else {
- listenAddress = InetAddress.getLocalHost();
- }
- TServerTransport serverTransport = new TServerSocket(
- new InetSocketAddress(listenAddress, port));
-
- TTransportFactory transportFactory;
- if (this.transport != null && this.transport.equals("framed")) {
- transportFactory = new TFramedTransport.Factory();
- } else {
- transportFactory = new TTransportFactory();
- }
-
- TBoundedThreadPoolServer.Options serverOptions =
- new TBoundedThreadPoolServer.Options(conf);
-
- LOG.info("starting " + ThriftServer.THREAD_POOL_SERVER_CLASS.getSimpleName() + " on "
- + listenAddress + ":" + Integer.toString(port)
- + "; minimum number of worker threads="
- + serverOptions.minWorkerThreads
- + ", maximum number of worker threads="
- + serverOptions.maxWorkerThreads + ", queued requests="
- + serverOptions.maxQueuedRequests);
- ThriftMetrics metrics = new ThriftMetrics(port, conf);
- tserver = new TBoundedThreadPoolServer(processor, serverTransport,
- transportFactory, protocolFactory, serverOptions, metrics);
- }
- tserver.serve();
- } catch (Exception e) {
- LOG.warn("Unable to start HRegionServerThrift interface.", e);
- }
}
}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java?rev=1328562&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java Sat Apr 21 01:37:15 2012
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A TThreadedSelectorServer.Args that reads hadoop configuration
+ */
+public class HThreadedSelectorServerArgs extends TThreadedSelectorServer.Args {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TThreadedSelectorServer.class);
+
+ /**
+ * Number of selector threads for reading and writing socket
+ */
+ public static final String SELECTOR_THREADS_SUFFIX =
+ "selector.threads";
+
+ /**
+ * Number of threads for processing the thrift calls
+ */
+ public static final String WORKER_THREADS_SUFFIX =
+ "worker.threads";
+
+ /**
+ * Time to wait for server to stop gracefully
+ */
+ public static final String STOP_TIMEOUT_SUFFIX =
+ "stop.timeout.seconds";
+
+ /**
+ * Maximum number of accepted elements per selector
+ */
+ public static final String ACCEPT_QUEUE_SIZE_PER_THREAD_SUFFIX =
+ "accept.queue.size.per.selector";
+
+ /**
+ * The strategy for handling new accepted connections.
+ */
+ public static final String ACCEPT_POLICY_SUFFIX =
+ "accept.policy";
+
+ public HThreadedSelectorServerArgs(TNonblockingServerTransport transport, Configuration conf,
+ String confKeyPrefix) {
+ super(transport);
+ readConf(conf, confKeyPrefix);
+ }
+
+ private void readConf(Configuration conf, String confKeyPrefix) {
+ int selectorThreads = conf.getInt(
+ confKeyPrefix + SELECTOR_THREADS_SUFFIX, getSelectorThreads());
+ int workerThreads = conf.getInt(
+ confKeyPrefix + WORKER_THREADS_SUFFIX, getWorkerThreads());
+ int stopTimeoutVal = conf.getInt(
+ confKeyPrefix + STOP_TIMEOUT_SUFFIX, getStopTimeoutVal());
+ int acceptQueueSizePerThread = conf.getInt(
+ confKeyPrefix + ACCEPT_QUEUE_SIZE_PER_THREAD_SUFFIX, getAcceptQueueSizePerThread());
+ AcceptPolicy acceptPolicy = AcceptPolicy.valueOf(conf.get(
+ confKeyPrefix + ACCEPT_POLICY_SUFFIX, getAcceptPolicy().toString()).toUpperCase());
+
+ super.selectorThreads(selectorThreads)
+ .workerThreads(workerThreads)
+ .stopTimeoutVal(stopTimeoutVal)
+ .acceptQueueSizePerThread(acceptQueueSizePerThread)
+ .acceptPolicy(acceptPolicy);
+
+ LOG.info("Read Thrift server configuration from keys with prefix '" + confKeyPrefix + "':" +
+ " selectorThreads:" + selectorThreads +
+ " workerThreads:" + workerThreads +
+ " stopTimeoutVal:" + stopTimeoutVal + "sec" +
+ " acceptQueueSizePerThread:" + acceptQueueSizePerThread +
+ " acceptPolicy:" + acceptPolicy);
+ }
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java?rev=1328562&r1=1328561&r2=1328562&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java Sat Apr 21 01:37:15 2012
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.thrift;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
@@ -33,21 +32,18 @@ import org.apache.hadoop.hbase.thrift.Ca
import org.apache.hadoop.hbase.util.Threads;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
-import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.transport.TTransportFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
- * A thread pool server customized for HBase.
+ * A bounded thread pool server customized for HBase.
*/
public class TBoundedThreadPoolServer extends TServer {
@@ -55,45 +51,78 @@ public class TBoundedThreadPoolServer ex
"Queue is full, closing connection";
/**
+ * The "core size" of the thread pool. New threads are created on every
+ * connection until this many threads are created.
+ */
+ public static final String MIN_WORKER_THREADS_SUFFIX = "minWorkerThreads";
+
+ /**
* This default core pool size should be enough for many test scenarios. We
* want to override this with a much larger number (e.g. at least 200) for a
* large-scale production setup.
*/
public static final int DEFAULT_MIN_WORKER_THREADS = 16;
- public static final int DEFAULT_MAX_WORKER_THREADS = 1000;
+ /**
+ * The maximum size of the thread pool. When the pending request queue
+ * overflows, new threads are created until their number reaches this number.
+ * After that, the server starts dropping connections.
+ */
+ public static final String MAX_WORKER_THREADS_SUFFIX = "maxWorkerThreads";
- public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
+ public static final int DEFAULT_MAX_WORKER_THREADS = 1000;
- public static final String MIN_WORKER_THREADS_CONF_KEY =
- "hbase.thrift.minWorkerThreads";
+ /**
+ * The maximum number of pending connections waiting in the queue. If there
+ * are no idle threads in the pool, the server queues requests. Only when
+ * the queue overflows, new threads are added, up to
+ * hbase.thrift.maxQueuedRequests threads.
+ */
+ public static final String MAX_QUEUED_REQUESTS_SUFFIX = "maxQueuedRequests";
- public static final String MAX_WORKER_THREADS_CONF_KEY =
- "hbase.thrift.maxWorkerThreads";
+ public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
- public static final String MAX_QUEUED_REQUESTS_CONF_KEY =
- "hbase.thrift.maxQueuedRequests";
+ /**
+ * Default amount of time in seconds to keep a thread alive. Worker threads
+ * are stopped after being idle for this long.
+ */
+ public static final String THREAD_KEEP_ALIVE_TIME_SEC_SUFFIX =
+ "hbase.thrift.threadKeepAliveTimeSec";
- private static final Log LOG = LogFactory.getLog(
- TBoundedThreadPoolServer.class.getName());
+ private static final int DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC = 60;
/**
* Time to wait after interrupting all worker threads. This is after a clean
* shutdown has been attempted.
*/
- public static final int SHUTDOWN_NOW_TIME_MS = 5000;
+ public static final int TIME_TO_WAIT_AFTER_SHUTDOWN_MS = 5000;
+
+ private static final Log LOG = LogFactory.getLog(
+ TBoundedThreadPoolServer.class.getName());
- public static class Options extends TThreadPoolServer.Options {
- public int maxQueuedRequests;
+ private final CallQueue callQueue;
- public Options(Configuration conf) {
- super();
- minWorkerThreads = conf.getInt(MIN_WORKER_THREADS_CONF_KEY,
+ public static class Args extends TThreadPoolServer.Args {
+ int maxQueuedRequests;
+ int threadKeepAliveTimeSec;
+
+ public Args(TServerTransport transport, Configuration conf, String confKeyPrefix) {
+ super(transport);
+ minWorkerThreads = conf.getInt(confKeyPrefix + MIN_WORKER_THREADS_SUFFIX,
DEFAULT_MIN_WORKER_THREADS);
- maxWorkerThreads = conf.getInt(MAX_WORKER_THREADS_CONF_KEY,
+ maxWorkerThreads = conf.getInt(confKeyPrefix + MAX_WORKER_THREADS_SUFFIX,
DEFAULT_MAX_WORKER_THREADS);
- maxQueuedRequests = conf.getInt(MAX_QUEUED_REQUESTS_CONF_KEY,
+ maxQueuedRequests = conf.getInt(confKeyPrefix + MAX_QUEUED_REQUESTS_SUFFIX,
DEFAULT_MAX_QUEUED_REQUESTS);
+ threadKeepAliveTimeSec = conf.getInt(confKeyPrefix + THREAD_KEEP_ALIVE_TIME_SEC_SUFFIX,
+ DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC);
+ }
+
+ @Override
+ public String toString() {
+ return "min worker threads=" + minWorkerThreads
+ + ", max worker threads=" + maxWorkerThreads
+ + ", max queued requests=" + maxQueuedRequests;
}
}
@@ -103,30 +132,16 @@ public class TBoundedThreadPoolServer ex
/** Flag for stopping the server */
private volatile boolean stopped;
- private Options serverOptions;
-
- private final int KEEP_ALIVE_TIME_SEC = 60;
+ private Args serverOptions;
- private final ThriftMetrics metrics;
+ public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) {
+ super(options);
- public TBoundedThreadPoolServer(TProcessor processor,
- TServerTransport serverTransport,
- TTransportFactory transportFactory,
- TProtocolFactory protocolFactory,
- Options options,
- ThriftMetrics metrics) {
- super(new TProcessorFactory(processor), serverTransport, transportFactory,
- transportFactory, protocolFactory, protocolFactory);
-
- this.metrics = metrics;
-
- BlockingQueue<Runnable> executorQueue;
if (options.maxQueuedRequests > 0) {
- executorQueue = new CallQueue(
+ this.callQueue = new CallQueue(
new LinkedBlockingQueue<Call>(options.maxQueuedRequests), metrics);
} else {
- executorQueue = new CallQueue(
- new SynchronousQueue<Call>(), metrics);
+ this.callQueue = new CallQueue(new SynchronousQueue<Call>(), metrics);
}
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
@@ -134,8 +149,8 @@ public class TBoundedThreadPoolServer ex
tfb.setNameFormat("thrift-worker-%d");
executorService =
new ThreadPoolExecutor(options.minWorkerThreads,
- options.maxWorkerThreads, KEEP_ALIVE_TIME_SEC, TimeUnit.SECONDS,
- executorQueue, tfb.build());
+ options.maxWorkerThreads, options.threadKeepAliveTimeSec,
+ TimeUnit.SECONDS, this.callQueue, tfb.build());
serverOptions = options;
}
@@ -147,19 +162,19 @@ public class TBoundedThreadPoolServer ex
return;
}
- Runtime.getRuntime().addShutdownHook(new Thread(getClass().getSimpleName() + "-shutdown-hook") {
- @Override
- public void run() {
- TBoundedThreadPoolServer.this.stop();
- }
- });
+ Runtime.getRuntime().addShutdownHook(
+ new Thread(getClass().getSimpleName() + "-shutdown-hook") {
+ @Override
+ public void run() {
+ TBoundedThreadPoolServer.this.stop();
+ }
+ });
stopped = false;
while (!stopped && !Thread.interrupted()) {
TTransport client = null;
try {
client = serverTransport_.accept();
- metrics.incNumConnections(1);
} catch (TTransportException ttx) {
if (!stopped) {
LOG.warn("Transport error when accepting message", ttx);
@@ -181,7 +196,6 @@ public class TBoundedThreadPoolServer ex
} else {
LOG.warn(QUEUE_FULL_MSG, rex);
}
- metrics.incNumConnections(-1);
client.close();
}
}
@@ -218,11 +232,11 @@ public class TBoundedThreadPoolServer ex
}
LOG.info("Interrupting all worker threads and waiting for "
- + SHUTDOWN_NOW_TIME_MS + " ms longer");
+ + TIME_TO_WAIT_AFTER_SHUTDOWN_MS + " ms longer");
// This will interrupt all the threads, even those running a task.
executorService.shutdownNow();
- Threads.sleepWithoutInterrupt(SHUTDOWN_NOW_TIME_MS);
+ Threads.sleepWithoutInterrupt(TIME_TO_WAIT_AFTER_SHUTDOWN_MS);
// Preserve the interrupted status.
if (interrupted) {
@@ -283,7 +297,6 @@ public class TBoundedThreadPoolServer ex
if (outputTransport != null) {
outputTransport.close();
}
- metrics.incNumConnections(-1);
}
}
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java?rev=1328562&r1=1328561&r2=1328562&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java Sat Apr 21 01:37:15 2012
@@ -24,7 +24,6 @@ import java.lang.reflect.Method;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.thrift.generated.Hbase;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
@@ -32,7 +31,6 @@ import org.apache.hadoop.metrics.Updater
import org.apache.hadoop.metrics.util.MetricsBase;
import org.apache.hadoop.metrics.util.MetricsIntValue;
import org.apache.hadoop.metrics.util.MetricsRegistry;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
/**
@@ -53,8 +51,10 @@ public class ThriftMetrics implements Up
private final MetricsIntValue callQueueLen =
new MetricsIntValue("callQueueLen", registry);
- private final MetricsTimeVaryingInt numConnections =
- new MetricsTimeVaryingInt("numConnections", registry);
+ private final MetricsTimeVaryingRate numRowKeysInBatchGet =
+ new MetricsTimeVaryingRate("numRowKeysInBatchGet", registry);
+ private final MetricsTimeVaryingRate numRowKeysInBatchMutate =
+ new MetricsTimeVaryingRate("numRowKeysInBatchMutate", registry);
private final MetricsTimeVaryingRate numBatchGetRowKeys =
new MetricsTimeVaryingRate("numBatchGetRowKeys", registry);
private final MetricsTimeVaryingRate numBatchMutateRowKeys =
@@ -66,7 +66,7 @@ public class ThriftMetrics implements Up
private MetricsTimeVaryingRate slowThriftCall =
new MetricsTimeVaryingRate("slowThriftCall", registry);
- public ThriftMetrics(int port, Configuration conf) {
+ public ThriftMetrics(int port, Configuration conf, Class<?> iface) {
slowResponseTime = conf.getLong(
SLOW_RESPONSE_NANO_SEC, DEFAULT_SLOW_RESPONSE_NANO_SEC);
context = MetricsUtil.getContext(CONTEXT_NAME);
@@ -78,7 +78,7 @@ public class ThriftMetrics implements Up
context.registerUpdater(this);
- createMetricsForMethods(Hbase.Iface.class);
+ createMetricsForMethods(iface);
}
public void incTimeInQueue(long time) {
@@ -89,8 +89,12 @@ public class ThriftMetrics implements Up
callQueueLen.set(len);
}
- public void incNumConnections(int diff) {
- numConnections.inc(diff);
+ public void incNumRowKeysInBatchGet(int diff) {
+ numRowKeysInBatchGet.inc(diff);
+ }
+
+ public void incNumRowKeysInBatchMutate(int diff) {
+ numRowKeysInBatchMutate.inc(diff);
}
public void incNumBatchGetRowKeys(int diff) {
@@ -120,6 +124,7 @@ public class ThriftMetrics implements Up
}
private void createMetricsForMethods(Class<?> iface) {
+ LOG.debug("Creating metrics for interface " + iface.toString());
for (Method m : iface.getDeclaredMethods()) {
if (getMethodTimeMetrics(m.getName()) == null)
LOG.debug("Creating metrics for method:" + m.getName());