You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/04/21 03:37:16 UTC

svn commit: r1328562 [1/9] - in /hbase/branches/0.89-fb: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/filter/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/thrift/ src/main/...

Author: mbautin
Date: Sat Apr 21 01:37:15 2012
New Revision: 1328562

URL: http://svn.apache.org/viewvc?rev=1328562&view=rev
Log:
[jira] [HBASE-5803] [89-fb] Upgrade hbase 0.89-fb to Thrift 0.8.0 and bring Thrift server enhancements from trunk

Summary: TBoundedThreadPoolServer has been a problem for us when there is a
large number of clients. We need to migrate to 0.8.0. in 89-fb and bring the
relevant improvements from trunk, including supporting TThreadedSelectorServer.

Test Plan:
- Run unit tests
- Deploy to a test cluster, turn on the load, and monitor
correctness/performance

Reviewers: schen, nspiegelberg, kannan, liyintang

Reviewed By: schen

Differential Revision: https://reviews.facebook.net/D2811

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
      - copied, changed from r1328561, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestHsHaServerCmdLine.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestNonblockingServerCmdLine.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThreadPoolServerFramedCmdLine.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThreadPoolServerUnframedCmdLine.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThreadedSelectorServerCmdLine.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerLegacy.java
      - copied, changed from r1328561, hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/ThriftServerCmdLineTestBase.java
Modified:
    hbase/branches/0.89-fb/pom.xml
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseFilter.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/AlreadyExists.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/BatchMutation.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/ColumnDescriptor.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/Hbase.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/IOError.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/IllegalArgument.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/Mutation.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/TCell.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/TRegionInfo.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/TRowResult.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/generated/TScan.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestCallQueue.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java

Modified: hbase/branches/0.89-fb/pom.xml
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/pom.xml?rev=1328562&r1=1328561&r2=1328562&view=diff
==============================================================================
--- hbase/branches/0.89-fb/pom.xml (original)
+++ hbase/branches/0.89-fb/pom.xml Sat Apr 21 01:37:15 2012
@@ -491,7 +491,7 @@
     <protobuf.version>2.3.0</protobuf.version>
     <slf4j.version>1.5.8</slf4j.version>
     <stax-api>1.0.1</stax-api>
-    <thrift.version>0.2.0</thrift.version>
+    <thrift.version>0.8.0</thrift.version>
     <guava.version>r09</guava.version>
   </properties>
 
@@ -627,7 +627,7 @@
     -->
     <dependency>
       <groupId>org.apache.thrift</groupId>
-      <artifactId>thrift</artifactId>
+      <artifactId>libthrift</artifactId>
       <version>${thrift.version}</version>
       <exclusions>
         <exclusion>

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1328562&r1=1328561&r2=1328562&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Sat Apr 21 01:37:15 2012
@@ -19,6 +19,8 @@
  */
 package org.apache.hadoop.hbase;
 
+import java.nio.ByteBuffer;
+
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -302,16 +304,22 @@ public final class HConstants {
    */
   public static final byte [] EMPTY_BYTE_ARRAY = new byte [0];
 
+  public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.wrap(EMPTY_BYTE_ARRAY);
+
   /**
    * Used by scanners, etc when they want to start at the beginning of a region
    */
   public static final byte [] EMPTY_START_ROW = EMPTY_BYTE_ARRAY;
 
+  public static final ByteBuffer EMPTY_START_ROW_BUF = ByteBuffer.wrap(EMPTY_START_ROW);
+
   /**
    * Last row in a table.
    */
   public static final byte [] EMPTY_END_ROW = EMPTY_START_ROW;
 
+  public static final ByteBuffer EMPTY_END_ROW_BUF = ByteBuffer.wrap(EMPTY_END_ROW);
+
   /**
     * Used by scanners and others when they're trying to detect the end of a
     * table
@@ -491,6 +499,35 @@ public final class HConstants {
 
   public static final String LOAD_BALANCER_SLOP_KEY = "hbase.regions.slop";
 
+  // Thrift server configuration options
+
+  /** Configuration key prefix for the stand-alone thrift proxy */
+  public static final String THRIFT_PROXY_PREFIX = "hbase.thrift.";
+
+  /** Configuration key prefix for thrift server embedded into the region server */
+  public static final String RS_THRIFT_PREFIX = "hbase.regionserver.thrift.";
+
+  /** Default port for the stand-alone thrift proxy */
+  public static final int DEFAULT_THRIFT_PROXY_PORT = 9090;
+
+  /** Default port for the thrift server embedded into regionserver */
+  public static final int DEFAULT_RS_THRIFT_SERVER_PORT = 9091;
+
+  /** Configuration key suffix for thrift server type (e.g. thread pool, nonblocking, etc.) */
+  public static final String THRIFT_SERVER_TYPE_SUFFIX = "server.type";
+
+  /** Configuration key suffix for the IP address for thrift server to bind to */
+  public static final String THRIFT_BIND_SUFFIX = "ipaddress";
+
+  /** Configuration key suffix for whether to use compact Thrift transport */
+  public static final String THRIFT_COMPACT_SUFFIX = "compact";
+
+  /** Configuration key suffix for whether to use framed Thrift transport */
+  public static final String THRIFT_FRAMED_SUFFIX = "framed";
+
+  /** Configuration key suffix for Thrift server port */
+  public static final String THRIFT_PORT_SUFFIX = "port";
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseFilter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseFilter.java?rev=1328562&r1=1328561&r2=1328562&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseFilter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/filter/ParseFilter.java Sat Apr 21 01:37:15 2012
@@ -19,26 +19,23 @@
  */
 package org.apache.hadoop.hbase.filter;
 
-import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
-import java.util.TreeSet;
 import java.util.ArrayList;
-import java.util.Stack;
+import java.util.Collections;
+import java.util.EmptyStackException;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
+import java.util.Stack;
 
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.filter.ParseConstants;
-
-import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import java.lang.ArrayIndexOutOfBoundsException;
-import java.lang.ClassCastException;
-import java.lang.reflect.*;
-import java.util.EmptyStackException;
+import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * This class allows a user to specify a filter via a string
@@ -51,6 +48,7 @@ import java.util.EmptyStackException;
  *
  */
 public class ParseFilter {
+  private static final Log LOG = LogFactory.getLog(ParseFilter.class);
 
   private static HashMap<ByteBuffer, Integer> operatorPrecedenceHashMap;
   private static HashMap<String, String> filterHashMap;
@@ -841,4 +839,27 @@ public class ParseFilter {
   public Set<String> getSupportedFilters () {
     return filterHashMap.keySet();
   }
+
+  /**
+   * Returns all known filters
+   * @return an unmodifiable map of filters
+   */
+  public static Map<String, String> getAllFilters() {
+    return Collections.unmodifiableMap(filterHashMap);
+  }
+
+  /**
+   * Register a new filter with the parser.  If the filter is already registered,
+   * an IllegalArgumentException will be thrown.
+   *
+   * @param name a name for the filter
+   * @param filterClass fully qualified class name
+   */
+  public static void registerFilter(String name, String filterClass) {
+    if(LOG.isInfoEnabled())
+      LOG.info("Registering new filter " + name);
+
+    filterHashMap.put(name, filterClass);
+  }
+
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java?rev=1328562&r1=1328561&r2=1328562&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java Sat Apr 21 01:37:15 2012
@@ -1,4 +1,6 @@
 /**
+ * Copyright 2011 The Apache Software Foundation
+ *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,13 +17,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,62 +30,58 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
+import org.apache.hadoop.hbase.thrift.ThriftServerRunner;
+import org.apache.hadoop.hbase.thrift.ThriftUtilities;
 import org.apache.hadoop.hbase.thrift.generated.Hbase;
 import org.apache.hadoop.hbase.thrift.generated.IOError;
 import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
 import org.apache.hadoop.hbase.thrift.generated.TRowResult;
-import org.apache.hadoop.hbase.thrift.TBoundedThreadPoolServer;
-import org.apache.hadoop.hbase.thrift.ThriftMetrics;
-import org.apache.hadoop.hbase.thrift.ThriftServer;
-import org.apache.hadoop.hbase.thrift.ThriftUtilities;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TTransportFactory;
 
 /**
- * ThriftServer - this class starts up a Thrift server in the same
+ * HRegionThriftServer - this class starts up a Thrift server in the same
  * JVM where the RegionServer is running. It inherits most of the
  * functionality from the standard ThriftServer. This is good because
  * we can maintain compatibility with applications that use the
- * standard Thrift interface. For performance reasons, we override
- * methods to directly invoke calls into the HRegionServer.
+ * standard Thrift interface. For performance reasons, we can override
+ * methods to directly invoke calls into the HRegionServer and avoid the hop.
+ * <p>
+ * This can be enabled with <i>hbase.regionserver.export.thrift</i> set to true.
  */
 public class HRegionThriftServer extends Thread {
 
   public static final Log LOG = LogFactory.getLog(HRegionThriftServer.class);
-  public static final int DEFAULT_LISTEN_PORT = 9091;
-
-  private HRegionServer rs;
-  private Configuration conf;
 
-  private int port;
-  private boolean nonblocking;
-  private String bindIpAddress;
-  private String transport;
-  private String protocol;
-  volatile private TServer tserver;
-  private boolean redirect; // redirect to appropriate Regionserver
+  private final HRegionServer rs;
+  private final ThriftServerRunner serverRunner;
 
   /**
    * Create an instance of the glue object that connects the
    * RegionServer with the standard ThriftServer implementation
    */
-  HRegionThriftServer(HRegionServer regionServer, Configuration conf) {
+  HRegionThriftServer(HRegionServer regionServer, Configuration conf)
+      throws IOException {
+    super("Region Thrift Server");
     this.rs = regionServer;
-    this.conf = conf;
+    this.serverRunner = new ThriftServerRunner(conf, HConstants.RS_THRIFT_PREFIX,
+        new HBaseHandlerRegion(conf));
+  }
+
+  /**
+   * Stop ThriftServer
+   */
+  void shutdown() {
+    serverRunner.shutdown();
+  }
+
+  @Override
+  public void run() {
+    serverRunner.run();
   }
 
   /**
@@ -93,7 +89,14 @@ public class HRegionThriftServer extends
    * to use the default implementation for most calls. We override certain calls
    * for performance reasons
    */
-  private class HBaseHandlerRegion extends ThriftServer.HBaseHandler {
+  private class HBaseHandlerRegion extends ThriftServerRunner.HBaseHandler
+      implements Hbase.Iface {
+
+    /**
+     * Whether requests should be redirected to other RegionServers if the
+     * specified region is not hosted by this RegionServer.
+     */
+    private boolean redirect;
 
     HBaseHandlerRegion(final Configuration conf) throws IOException {
       super(conf);
@@ -101,6 +104,14 @@ public class HRegionThriftServer extends
     }
 
     /**
+     * Read and initialize config parameters
+     */
+    private void initialize(Configuration conf) {
+      this.redirect = conf.getBoolean("hbase.regionserver.thrift.redirect",
+          false);
+    }
+
+    /**
      * Do increments. Shortcircuit to get better performance.
      */
     @Override
@@ -130,23 +141,24 @@ public class HRegionThriftServer extends
      * Get a record. Shortcircuit to get better performance.
      */
     @Override
-    public List<TRowResult> getRowWithColumnsTs(byte[] tableName, byte[] row,
-                                                List<byte[]> columns,
+    public List<TRowResult> getRowWithColumnsTs(ByteBuffer tableName, ByteBuffer row,
+                                                List<ByteBuffer> columns,
                                                 long timestamp)
       throws IOError {
       try {
         HTable table = getTable(tableName);
-        HRegionLocation location = table.getRegionLocation(row);
+        byte[] rowBytes = Bytes.getBytes(row);
+        HRegionLocation location = table.getRegionLocation(rowBytes);
         byte[] regionName = location.getRegionInfo().getRegionName();
 
         if (columns == null) {
-          Get get = new Get(row);
+          Get get = new Get(rowBytes);
           get.setTimeRange(Long.MIN_VALUE, timestamp);
           Result result = rs.get(regionName, get);
           return ThriftUtilities.rowResultFromHBase(result);
         }
         byte[][] columnArr = columns.toArray(new byte[columns.size()][]);
-        Get get = new Get(row);
+        Get get = new Get(rowBytes);
         for (byte[] column : columnArr) {
           byte[][] famAndQf = KeyValue.parseColumn(column);
           if (famAndQf.length == 1) {
@@ -168,26 +180,28 @@ public class HRegionThriftServer extends
         throw new IOError(e.getMessage());
       }
     }
+
     @Override
-    public List<TRowResult> getRowWithColumnPrefix(byte[] tableName,
-        byte[] row, byte[] prefix) throws IOError {
+    public List<TRowResult> getRowWithColumnPrefix(ByteBuffer tableName,
+        ByteBuffer row, ByteBuffer prefix) throws IOError {
       return (getRowWithColumnPrefixTs(tableName, row, prefix,
           HConstants.LATEST_TIMESTAMP));
     }
 
     @Override
-    public List<TRowResult> getRowWithColumnPrefixTs(byte[] tableName,
-        byte[] row, byte[] prefix, long timestamp) throws IOError {
+    public List<TRowResult> getRowWithColumnPrefixTs(ByteBuffer tableName,
+        ByteBuffer row, ByteBuffer prefix, long timestamp) throws IOError {
       try {
         HTable table = getTable(tableName);
+        byte[] rowBytes = Bytes.getBytes(row);
         if (prefix == null) {
-          Get get = new Get(row);
+          Get get = new Get(rowBytes);
           get.setTimeRange(Long.MIN_VALUE, timestamp);
           Result result = table.get(get);
           return ThriftUtilities.rowResultFromHBase(result);
         }
-        Get get = new Get(row);
-        byte[][] famAndPrefix = KeyValue.parseColumn(prefix);
+        Get get = new Get(rowBytes);
+        byte[][] famAndPrefix = KeyValue.parseColumn(Bytes.getBytes(prefix));
         if (famAndPrefix.length == 2) {
           get.addFamily(famAndPrefix[0]);
           get.setFilter(new ColumnPrefixFilter(famAndPrefix[1]));
@@ -201,92 +215,5 @@ public class HRegionThriftServer extends
         throw new IOError(e.getMessage());
       }
     }
-
-  }
-
-  /**
-   * Read and initialize config parameters
-   */
-  private void initialize(Configuration conf) {
-    this.port = conf.getInt("hbase.regionserver.thrift.port",
-                            DEFAULT_LISTEN_PORT);
-    this.bindIpAddress = conf.get("hbase.regionserver.thrift.ipaddress");
-    this.protocol = conf.get("hbase.regionserver.thrift.protocol");
-    this.transport = conf.get("hbase.regionserver.thrift.transport");
-    this.nonblocking = conf.getBoolean("hbase.regionserver.thrift.nonblocking",
-                                       false);
-    this.redirect = conf.getBoolean("hbase.regionserver.thrift.redirect",
-                                       false);
-  }
-
-  /**
-   * Stop ThriftServer
-   */
-  void shutdown() {
-    if (tserver != null) {
-      tserver.stop();
-      tserver = null;
-    }
-  }
-
-  public void run() {
-    try {
-      HBaseHandlerRegion handler = new HBaseHandlerRegion(this.conf);
-      Hbase.Processor processor = new Hbase.Processor(handler);
-
-      TProtocolFactory protocolFactory;
-      if (this.protocol != null && this.protocol.equals("compact")) {
-        protocolFactory = new TCompactProtocol.Factory();
-      } else {
-        protocolFactory = new TBinaryProtocol.Factory();
-      }
-
-      if (this.nonblocking) {
-        LOG.info("starting HRegionServer Nonblocking Thrift server on " +
-                 this.port);
-        LOG.info("HRegionServer Nonblocking Thrift server does not " +
-                 "support address binding.");
-        TNonblockingServerTransport serverTransport =
-          new TNonblockingServerSocket(this.port);
-        TFramedTransport.Factory transportFactory =
-          new TFramedTransport.Factory();
-
-        tserver = new TNonblockingServer(processor, serverTransport,
-                                        transportFactory, protocolFactory);
-      } else {
-        InetAddress listenAddress = null;
-        if (this.bindIpAddress != null) {
-          listenAddress = InetAddress.getByName(this.bindIpAddress);
-        } else {
-          listenAddress = InetAddress.getLocalHost();
-        }
-        TServerTransport serverTransport = new TServerSocket(
-           new InetSocketAddress(listenAddress, port));
-
-        TTransportFactory transportFactory;
-        if (this.transport != null && this.transport.equals("framed")) {
-          transportFactory = new TFramedTransport.Factory();
-        } else {
-          transportFactory = new TTransportFactory();
-        }
-
-        TBoundedThreadPoolServer.Options serverOptions =
-            new TBoundedThreadPoolServer.Options(conf);
-
-        LOG.info("starting " + ThriftServer.THREAD_POOL_SERVER_CLASS.getSimpleName() + " on "
-            + listenAddress + ":" + Integer.toString(port)
-            + "; minimum number of worker threads="
-            + serverOptions.minWorkerThreads
-            + ", maximum number of worker threads="
-            + serverOptions.maxWorkerThreads + ", queued requests="
-            + serverOptions.maxQueuedRequests);
-        ThriftMetrics metrics = new ThriftMetrics(port, conf);
-        tserver = new TBoundedThreadPoolServer(processor, serverTransport,
-                                       transportFactory, protocolFactory, serverOptions, metrics);
-      }
-      tserver.serve();
-    } catch (Exception e) {
-      LOG.warn("Unable to start HRegionServerThrift interface.", e);
-    }
   }
 }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java?rev=1328562&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java Sat Apr 21 01:37:15 2012
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A TThreadedSelectorServer.Args that reads hadoop configuration
+ */
+public class HThreadedSelectorServerArgs extends TThreadedSelectorServer.Args {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TThreadedSelectorServer.class);
+
+  /**
+   * Number of selector threads for reading and writing socket
+   */
+  public static final String SELECTOR_THREADS_SUFFIX =
+      "selector.threads";
+
+  /**
+   * Number of threads for processing the thrift calls
+   */
+  public static final String WORKER_THREADS_SUFFIX =
+      "worker.threads";
+
+  /**
+   * Time to wait for server to stop gracefully
+   */
+  public static final String STOP_TIMEOUT_SUFFIX =
+      "stop.timeout.seconds";
+
+  /**
+   * Maximum number of accepted elements per selector
+   */
+  public static final String ACCEPT_QUEUE_SIZE_PER_THREAD_SUFFIX =
+      "accept.queue.size.per.selector";
+
+  /**
+   * The strategy for handling new accepted connections.
+   */
+  public static final String ACCEPT_POLICY_SUFFIX =
+      "accept.policy";
+
+  public HThreadedSelectorServerArgs(TNonblockingServerTransport transport, Configuration conf,
+      String confKeyPrefix) {
+    super(transport);
+    readConf(conf, confKeyPrefix);
+  }
+
+  private void readConf(Configuration conf, String confKeyPrefix) {
+    int selectorThreads = conf.getInt(
+        confKeyPrefix + SELECTOR_THREADS_SUFFIX, getSelectorThreads());
+    int workerThreads = conf.getInt(
+        confKeyPrefix + WORKER_THREADS_SUFFIX, getWorkerThreads());
+    int stopTimeoutVal = conf.getInt(
+        confKeyPrefix + STOP_TIMEOUT_SUFFIX, getStopTimeoutVal());
+    int acceptQueueSizePerThread = conf.getInt(
+        confKeyPrefix + ACCEPT_QUEUE_SIZE_PER_THREAD_SUFFIX, getAcceptQueueSizePerThread());
+    AcceptPolicy acceptPolicy = AcceptPolicy.valueOf(conf.get(
+        confKeyPrefix + ACCEPT_POLICY_SUFFIX, getAcceptPolicy().toString()).toUpperCase());
+
+    super.selectorThreads(selectorThreads)
+         .workerThreads(workerThreads)
+         .stopTimeoutVal(stopTimeoutVal)
+         .acceptQueueSizePerThread(acceptQueueSizePerThread)
+         .acceptPolicy(acceptPolicy);
+
+    LOG.info("Read Thrift server configuration from keys with prefix '" + confKeyPrefix + "':" +
+             " selectorThreads:" + selectorThreads +
+             " workerThreads:" + workerThreads +
+             " stopTimeoutVal:" + stopTimeoutVal + "sec" +
+             " acceptQueueSizePerThread:" + acceptQueueSizePerThread +
+             " acceptPolicy:" + acceptPolicy);
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java?rev=1328562&r1=1328561&r2=1328562&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java Sat Apr 21 01:37:15 2012
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hbase.thrift;
 
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
@@ -33,21 +32,18 @@ import org.apache.hadoop.hbase.thrift.Ca
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
-import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TServerTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.transport.TTransportFactory;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
- * A thread pool server customized for HBase.
+ * A bounded thread pool server customized for HBase.
  */
 public class TBoundedThreadPoolServer extends TServer {
 
@@ -55,45 +51,78 @@ public class TBoundedThreadPoolServer ex
       "Queue is full, closing connection";
 
   /**
+   * The "core size" of the thread pool. New threads are created on every
+   * connection until this many threads are created.
+   */
+  public static final String MIN_WORKER_THREADS_SUFFIX = "minWorkerThreads";
+
+  /**
    * This default core pool size should be enough for many test scenarios. We
    * want to override this with a much larger number (e.g. at least 200) for a
    * large-scale production setup.
    */
   public static final int DEFAULT_MIN_WORKER_THREADS = 16;
 
-  public static final int DEFAULT_MAX_WORKER_THREADS = 1000;
+  /**
+   * The maximum size of the thread pool. When the pending request queue
+   * overflows, new threads are created until their number reaches this number.
+   * After that, the server starts dropping connections.
+   */
+  public static final String MAX_WORKER_THREADS_SUFFIX = "maxWorkerThreads";
 
-  public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
+  public static final int DEFAULT_MAX_WORKER_THREADS = 1000;
 
-  public static final String MIN_WORKER_THREADS_CONF_KEY =
-      "hbase.thrift.minWorkerThreads";
+  /**
+   * The maximum number of pending connections waiting in the queue. If there
+   * are no idle threads in the pool, the server queues requests. Only when
+   * the queue overflows, new threads are added, up to
+   * hbase.thrift.maxQueuedRequests threads.
+   */
+  public static final String MAX_QUEUED_REQUESTS_SUFFIX = "maxQueuedRequests";
 
-  public static final String MAX_WORKER_THREADS_CONF_KEY =
-      "hbase.thrift.maxWorkerThreads";
+  public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
 
-  public static final String MAX_QUEUED_REQUESTS_CONF_KEY =
-      "hbase.thrift.maxQueuedRequests";
+  /**
+   * Default amount of time in seconds to keep a thread alive. Worker threads
+   * are stopped after being idle for this long.
+   */
+  public static final String THREAD_KEEP_ALIVE_TIME_SEC_SUFFIX =
+      "hbase.thrift.threadKeepAliveTimeSec";
 
-  private static final Log LOG = LogFactory.getLog(
-      TBoundedThreadPoolServer.class.getName());
+  private static final int DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC = 60;
 
   /**
    * Time to wait after interrupting all worker threads. This is after a clean
    * shutdown has been attempted.
    */
-  public static final int SHUTDOWN_NOW_TIME_MS = 5000;
+  public static final int TIME_TO_WAIT_AFTER_SHUTDOWN_MS = 5000;
+
+  private static final Log LOG = LogFactory.getLog(
+      TBoundedThreadPoolServer.class.getName());
 
-  public static class Options extends TThreadPoolServer.Options {
-    public int maxQueuedRequests;
+  private final CallQueue callQueue;
 
-    public Options(Configuration conf) {
-      super();
-      minWorkerThreads = conf.getInt(MIN_WORKER_THREADS_CONF_KEY,
+  public static class Args extends TThreadPoolServer.Args {
+    int maxQueuedRequests;
+    int threadKeepAliveTimeSec;
+
+    public Args(TServerTransport transport, Configuration conf, String confKeyPrefix) {
+      super(transport);
+      minWorkerThreads = conf.getInt(confKeyPrefix + MIN_WORKER_THREADS_SUFFIX,
           DEFAULT_MIN_WORKER_THREADS);
-      maxWorkerThreads = conf.getInt(MAX_WORKER_THREADS_CONF_KEY,
+      maxWorkerThreads = conf.getInt(confKeyPrefix + MAX_WORKER_THREADS_SUFFIX,
           DEFAULT_MAX_WORKER_THREADS);
-      maxQueuedRequests = conf.getInt(MAX_QUEUED_REQUESTS_CONF_KEY,
+      maxQueuedRequests = conf.getInt(confKeyPrefix + MAX_QUEUED_REQUESTS_SUFFIX,
           DEFAULT_MAX_QUEUED_REQUESTS);
+      threadKeepAliveTimeSec = conf.getInt(confKeyPrefix + THREAD_KEEP_ALIVE_TIME_SEC_SUFFIX,
+          DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC);
+    }
+
+    @Override
+    public String toString() {
+      return "min worker threads=" + minWorkerThreads
+          + ", max worker threads=" + maxWorkerThreads
+          + ", max queued requests=" + maxQueuedRequests;
     }
   }
 
@@ -103,30 +132,16 @@ public class TBoundedThreadPoolServer ex
   /** Flag for stopping the server */
   private volatile boolean stopped;
 
-  private Options serverOptions;
-
-  private final int KEEP_ALIVE_TIME_SEC = 60;
+  private Args serverOptions;
 
-  private final ThriftMetrics metrics;
+  public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) {
+    super(options);
 
-  public TBoundedThreadPoolServer(TProcessor processor,
-      TServerTransport serverTransport,
-      TTransportFactory transportFactory,
-      TProtocolFactory protocolFactory,
-      Options options,
-      ThriftMetrics metrics) {
-    super(new TProcessorFactory(processor), serverTransport, transportFactory,
-        transportFactory, protocolFactory, protocolFactory);
-
-    this.metrics = metrics;
-
-    BlockingQueue<Runnable> executorQueue;
     if (options.maxQueuedRequests > 0) {
-      executorQueue = new CallQueue(
+      this.callQueue = new CallQueue(
           new LinkedBlockingQueue<Call>(options.maxQueuedRequests), metrics);
     } else {
-      executorQueue = new CallQueue(
-          new SynchronousQueue<Call>(), metrics);
+      this.callQueue = new CallQueue(new SynchronousQueue<Call>(), metrics);
     }
 
     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
@@ -134,8 +149,8 @@ public class TBoundedThreadPoolServer ex
     tfb.setNameFormat("thrift-worker-%d");
     executorService =
         new ThreadPoolExecutor(options.minWorkerThreads,
-            options.maxWorkerThreads, KEEP_ALIVE_TIME_SEC, TimeUnit.SECONDS,
-            executorQueue, tfb.build());
+            options.maxWorkerThreads, options.threadKeepAliveTimeSec,
+            TimeUnit.SECONDS, this.callQueue, tfb.build());
     serverOptions = options;
   }
 
@@ -147,19 +162,19 @@ public class TBoundedThreadPoolServer ex
       return;
     }
 
-    Runtime.getRuntime().addShutdownHook(new Thread(getClass().getSimpleName() + "-shutdown-hook") {
-      @Override
-      public void run() {
-        TBoundedThreadPoolServer.this.stop();
-      }
-    });
+    Runtime.getRuntime().addShutdownHook(
+        new Thread(getClass().getSimpleName() + "-shutdown-hook") {
+          @Override
+          public void run() {
+            TBoundedThreadPoolServer.this.stop();
+          }
+        });
 
     stopped = false;
     while (!stopped && !Thread.interrupted()) {
       TTransport client = null;
       try {
         client = serverTransport_.accept();
-        metrics.incNumConnections(1);
       } catch (TTransportException ttx) {
         if (!stopped) {
           LOG.warn("Transport error when accepting message", ttx);
@@ -181,7 +196,6 @@ public class TBoundedThreadPoolServer ex
         } else {
           LOG.warn(QUEUE_FULL_MSG, rex);
         }
-        metrics.incNumConnections(-1);
         client.close();
       }
     }
@@ -218,11 +232,11 @@ public class TBoundedThreadPoolServer ex
     }
 
     LOG.info("Interrupting all worker threads and waiting for "
-        + SHUTDOWN_NOW_TIME_MS + " ms longer");
+        + TIME_TO_WAIT_AFTER_SHUTDOWN_MS + " ms longer");
 
     // This will interrupt all the threads, even those running a task.
     executorService.shutdownNow();
-    Threads.sleepWithoutInterrupt(SHUTDOWN_NOW_TIME_MS);
+    Threads.sleepWithoutInterrupt(TIME_TO_WAIT_AFTER_SHUTDOWN_MS);
 
     // Preserve the interrupted status.
     if (interrupted) {
@@ -283,7 +297,6 @@ public class TBoundedThreadPoolServer ex
       if (outputTransport != null) {
         outputTransport.close();
       }
-      metrics.incNumConnections(-1);
     }
   }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java?rev=1328562&r1=1328561&r2=1328562&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java Sat Apr 21 01:37:15 2012
@@ -24,7 +24,6 @@ import java.lang.reflect.Method;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.thrift.generated.Hbase;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
@@ -32,7 +31,6 @@ import org.apache.hadoop.metrics.Updater
 import org.apache.hadoop.metrics.util.MetricsBase;
 import org.apache.hadoop.metrics.util.MetricsIntValue;
 import org.apache.hadoop.metrics.util.MetricsRegistry;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 
 /**
@@ -53,8 +51,10 @@ public class ThriftMetrics implements Up
 
   private final MetricsIntValue callQueueLen =
       new MetricsIntValue("callQueueLen", registry);
-  private final MetricsTimeVaryingInt numConnections =
-      new MetricsTimeVaryingInt("numConnections", registry);
+  private final MetricsTimeVaryingRate numRowKeysInBatchGet =
+      new MetricsTimeVaryingRate("numRowKeysInBatchGet", registry);
+  private final MetricsTimeVaryingRate numRowKeysInBatchMutate =
+      new MetricsTimeVaryingRate("numRowKeysInBatchMutate", registry);
   private final MetricsTimeVaryingRate numBatchGetRowKeys =
       new MetricsTimeVaryingRate("numBatchGetRowKeys", registry);
   private final MetricsTimeVaryingRate numBatchMutateRowKeys =
@@ -66,7 +66,7 @@ public class ThriftMetrics implements Up
   private MetricsTimeVaryingRate slowThriftCall =
       new MetricsTimeVaryingRate("slowThriftCall", registry);
 
-  public ThriftMetrics(int port, Configuration conf) {
+  public ThriftMetrics(int port, Configuration conf, Class<?> iface) {
     slowResponseTime = conf.getLong(
         SLOW_RESPONSE_NANO_SEC, DEFAULT_SLOW_RESPONSE_NANO_SEC);
     context = MetricsUtil.getContext(CONTEXT_NAME);
@@ -78,7 +78,7 @@ public class ThriftMetrics implements Up
 
     context.registerUpdater(this);
 
-    createMetricsForMethods(Hbase.Iface.class);
+    createMetricsForMethods(iface);
   }
 
   public void incTimeInQueue(long time) {
@@ -89,8 +89,12 @@ public class ThriftMetrics implements Up
     callQueueLen.set(len);
   }
 
-  public void incNumConnections(int diff) {
-    numConnections.inc(diff);
+  public void incNumRowKeysInBatchGet(int diff) {
+    numRowKeysInBatchGet.inc(diff);
+  }
+
+  public void incNumRowKeysInBatchMutate(int diff) {
+    numRowKeysInBatchMutate.inc(diff);
   }
 
   public void incNumBatchGetRowKeys(int diff) {
@@ -120,6 +124,7 @@ public class ThriftMetrics implements Up
   }
 
   private void createMetricsForMethods(Class<?> iface) {
+    LOG.debug("Creating metrics for interface " + iface.toString());
     for (Method m : iface.getDeclaredMethods()) {
       if (getMethodTimeMetrics(m.getName()) == null)
         LOG.debug("Creating metrics for method:" + m.getName());