You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/01/17 18:38:58 UTC
svn commit: r1232504 - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/regionserver/
src/main/java/org/apache/hadoop/hbase/thrift/
src/test/java/org/apache/hadoop/hbase/thrift/
Author: tedyu
Date: Tue Jan 17 17:38:58 2012
New Revision: 1232504
URL: http://svn.apache.org/viewvc?rev=1232504&view=rev
Log:
HBASE-5201 Utilize TThreadedSelectorServer and remove redundant code in ThriftServer and HRegionThriftServer (Scott Chen)
Modified:
hbase/trunk/pom.xml
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java
Modified: hbase/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/pom.xml?rev=1232504&r1=1232503&r2=1232504&view=diff
==============================================================================
--- hbase/trunk/pom.xml (original)
+++ hbase/trunk/pom.xml Tue Jan 17 17:38:58 2012
@@ -887,7 +887,7 @@
<protobuf.version>2.4.0a</protobuf.version>
<slf4j.version>1.5.8</slf4j.version><!-- newer version available -->
<stax-api.version>1.0.1</stax-api.version>
- <thrift.version>0.7.0</thrift.version>
+ <thrift.version>0.8.0</thrift.version>
<zookeeper.version>3.4.2</zookeeper.version>
<hadoop-snappy.version>0.0.1-SNAPSHOT</hadoop-snappy.version>
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java?rev=1232504&r1=1232503&r2=1232504&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java Tue Jan 17 17:38:58 2012
@@ -20,8 +20,6 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
@@ -34,24 +32,10 @@ import org.apache.hadoop.hbase.NotServin
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.hbase.thrift.ThriftServerRunner;
+import org.apache.hadoop.hbase.thrift.ThriftUtilities;
import org.apache.hadoop.hbase.thrift.generated.IOError;
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
-import org.apache.hadoop.hbase.thrift.TBoundedThreadPoolServer;
-import org.apache.hadoop.hbase.thrift.TBoundedThreadPoolServer.Args;
-import org.apache.hadoop.hbase.thrift.ThriftServer;
-import org.apache.hadoop.hbase.thrift.ThriftUtilities;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TTransportFactory;
/**
* HRegionThriftServer - this class starts up a Thrift server in the same
@@ -68,43 +52,59 @@ public class HRegionThriftServer extends
public static final Log LOG = LogFactory.getLog(HRegionThriftServer.class);
public static final int DEFAULT_LISTEN_PORT = 9090;
- private HRegionServer rs;
- private Configuration conf;
-
- private int port;
- private boolean nonblocking;
- private String bindIpAddress;
- private String transport;
- private String protocol;
- volatile private TServer tserver;
-
- /**
- * Whether requests should be redirected to other RegionServers if the
- * specified region is not hosted by this RegionServer.
- */
- private boolean redirect;
+ private final HRegionServer rs;
+ private final ThriftServerRunner serverRunner;
/**
* Create an instance of the glue object that connects the
* RegionServer with the standard ThriftServer implementation
*/
- HRegionThriftServer(HRegionServer regionServer, Configuration conf) {
+ HRegionThriftServer(HRegionServer regionServer, Configuration conf)
+ throws IOException {
+ super("Region Thrift Server");
this.rs = regionServer;
- this.conf = conf;
+ this.serverRunner =
+ new ThriftServerRunner(conf, new HBaseHandlerRegion(conf));
}
/**
- * Inherit the Handler from the standard ThriftServer. This allows us
+ * Stop ThriftServer
+ */
+ void shutdown() {
+ serverRunner.shutdown();
+ }
+
+ @Override
+ public void run() {
+ serverRunner.run();
+ }
+
+ /**
+ * Inherit the Handler from the standard ThriftServerRunner. This allows us
* to use the default implementation for most calls. We override certain calls
* for performance reasons
*/
- private class HBaseHandlerRegion extends ThriftServer.HBaseHandler {
+ private class HBaseHandlerRegion extends ThriftServerRunner.HBaseHandler {
+
+ /**
+ * Whether requests should be redirected to other RegionServers if the
+ * specified region is not hosted by this RegionServer.
+ */
+ private boolean redirect;
HBaseHandlerRegion(final Configuration conf) throws IOException {
super(conf);
initialize(conf);
}
+ /**
+ * Read and initialize config parameters
+ */
+ private void initialize(Configuration conf) {
+ this.redirect = conf.getBoolean("hbase.regionserver.thrift.redirect",
+ false);
+ }
+
// TODO: Override more methods to short-circuit for performance
/**
@@ -119,7 +119,7 @@ public class HRegionThriftServer extends
try {
byte [] row = rowb.array();
HTable table = getTable(tableName.array());
- HRegionLocation location = table.getRegionLocation(row);
+ HRegionLocation location = table.getRegionLocation(row, false);
byte[] regionName = location.getRegionInfo().getRegionName();
if (columns == null) {
@@ -153,91 +153,4 @@ public class HRegionThriftServer extends
}
}
}
-
- /**
- * Read and initialize config parameters
- */
- private void initialize(Configuration conf) {
- this.port = conf.getInt("hbase.regionserver.thrift.port",
- DEFAULT_LISTEN_PORT);
- this.bindIpAddress = conf.get("hbase.regionserver.thrift.ipaddress");
- this.protocol = conf.get("hbase.regionserver.thrift.protocol");
- this.transport = conf.get("hbase.regionserver.thrift.transport");
- this.nonblocking = conf.getBoolean("hbase.regionserver.thrift.nonblocking",
- false);
- this.redirect = conf.getBoolean("hbase.regionserver.thrift.redirect",
- false);
- }
-
- /**
- * Stop ThriftServer
- */
- void shutdown() {
- if (tserver != null) {
- tserver.stop();
- tserver = null;
- }
- }
-
- @Override
- public void run() {
- try {
- HBaseHandlerRegion handler = new HBaseHandlerRegion(this.conf);
- Hbase.Processor<HBaseHandlerRegion> processor =
- new Hbase.Processor<HBaseHandlerRegion>(handler);
-
- TProtocolFactory protocolFactory;
- if (this.protocol != null && this.protocol.equals("compact")) {
- protocolFactory = new TCompactProtocol.Factory();
- } else {
- protocolFactory = new TBinaryProtocol.Factory();
- }
-
- if (this.nonblocking) {
- TNonblockingServerTransport serverTransport =
- new TNonblockingServerSocket(this.port);
- TFramedTransport.Factory transportFactory =
- new TFramedTransport.Factory();
-
- TNonblockingServer.Args serverArgs =
- new TNonblockingServer.Args(serverTransport);
- serverArgs.processor(processor);
- serverArgs.transportFactory(transportFactory);
- serverArgs.protocolFactory(protocolFactory);
- LOG.info("starting HRegionServer Nonblocking Thrift server on " +
- this.port);
- LOG.info("HRegionServer Nonblocking Thrift server does not " +
- "support address binding.");
- tserver = new TNonblockingServer(serverArgs);
- } else {
- InetAddress listenAddress = null;
- if (this.bindIpAddress != null) {
- listenAddress = InetAddress.getByName(this.bindIpAddress);
- } else {
- listenAddress = InetAddress.getLocalHost();
- }
- TServerTransport serverTransport = new TServerSocket(
- new InetSocketAddress(listenAddress, port));
-
- TTransportFactory transportFactory;
- if (this.transport != null && this.transport.equals("framed")) {
- transportFactory = new TFramedTransport.Factory();
- } else {
- transportFactory = new TTransportFactory();
- }
-
- TBoundedThreadPoolServer.Args serverArgs =
- new TBoundedThreadPoolServer.Args(serverTransport, conf);
- serverArgs.processor(processor);
- serverArgs.protocolFactory(protocolFactory);
- serverArgs.transportFactory(transportFactory);
- LOG.info("starting HRegionServer ThreadPool Thrift server on " +
- listenAddress + ":" + this.port);
- tserver = new TBoundedThreadPoolServer(serverArgs);
- }
- tserver.serve();
- } catch (Exception e) {
- LOG.warn("Unable to start HRegionServerThrift interface.", e);
- }
- }
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java?rev=1232504&r1=1232503&r2=1232504&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java Tue Jan 17 17:38:58 2012
@@ -18,89 +18,26 @@
package org.apache.hadoop.hbase.thrift;
-import static org.apache.hadoop.hbase.util.Bytes.getBytes;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.ParseFilter;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.Strings;
-import org.apache.hadoop.net.DNS;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.filter.WhileMatchFilter;
-import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
-import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
-import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
-import org.apache.hadoop.hbase.thrift.generated.Hbase;
-import org.apache.hadoop.hbase.thrift.generated.Hbase.Iface;
-import org.apache.hadoop.hbase.thrift.generated.Hbase.Processor;
-import org.apache.hadoop.hbase.thrift.generated.IOError;
-import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
-import org.apache.hadoop.hbase.thrift.generated.Mutation;
-import org.apache.hadoop.hbase.thrift.generated.TCell;
-import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
-import org.apache.hadoop.hbase.thrift.generated.TRowResult;
-import org.apache.hadoop.hbase.thrift.generated.TScan;
-import org.apache.hadoop.hbase.util.Addressing;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
import org.apache.hadoop.hbase.util.VersionInfo;
-import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.util.Shell.ExitCodeException;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.THsHaServer;
-import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TServer.AbstractServerArgs;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TTransportFactory;
-
-import com.google.common.base.Joiner;
/**
- * ThriftServer - this class starts up a Thrift server which implements the
- * Hbase API specified in the Hbase.thrift IDL file.
+ * ThriftServer- this class starts up a Thrift server which implements the
+ * Hbase API specified in the Hbase.thrift IDL file. The server runs in an
+ * independent process.
*/
public class ThriftServer {
@@ -119,968 +56,16 @@ public class ThriftServer {
private static final int DEFAULT_LISTEN_PORT = 9090;
private Configuration conf;
- TServer server;
-
- /** An enum of server implementation selections */
- enum ImplType {
- HS_HA("hsha", true, THsHaServer.class, false),
- NONBLOCKING("nonblocking", true, TNonblockingServer.class, false),
- THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true);
-
- public static final ImplType DEFAULT = THREAD_POOL;
-
- final String option;
- final boolean isAlwaysFramed;
- final Class<? extends TServer> serverClass;
- final boolean canSpecifyBindIP;
-
- ImplType(String option, boolean isAlwaysFramed,
- Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
- this.option = option;
- this.isAlwaysFramed = isAlwaysFramed;
- this.serverClass = serverClass;
- this.canSpecifyBindIP = canSpecifyBindIP;
- }
-
- /**
- * @return <code>-option</code> so we can get the list of options from
- * {@link #values()}
- */
- @Override
- public String toString() {
- return "-" + option;
- }
-
- String getDescription() {
- StringBuilder sb = new StringBuilder("Use the " +
- serverClass.getSimpleName());
- if (isAlwaysFramed) {
- sb.append(" This implies the framed transport.");
- }
- if (this == DEFAULT) {
- sb.append("This is the default.");
- }
- return sb.toString();
- }
-
- static OptionGroup createOptionGroup() {
- OptionGroup group = new OptionGroup();
- for (ImplType t : values()) {
- group.addOption(new Option(t.option, t.getDescription()));
- }
- return group;
- }
-
- static ImplType getServerImpl(CommandLine cmd) {
- ImplType chosenType = null;
- int numChosen = 0;
- for (ImplType t : values()) {
- if (cmd.hasOption(t.option)) {
- chosenType = t;
- ++numChosen;
- }
- }
- if (numChosen != 1) {
- throw new AssertionError("Exactly one option out of " +
- Arrays.toString(values()) + " has to be specified");
- }
- return chosenType;
- }
-
- public String simpleClassName() {
- return serverClass.getSimpleName();
- }
-
- public static List<String> serversThatCannotSpecifyBindIP() {
- List<String> l = new ArrayList<String>();
- for (ImplType t : values()) {
- if (!t.canSpecifyBindIP) {
- l.add(t.simpleClassName());
- }
- }
- return l;
- }
-
- }
-
- /**
- * The HBaseHandler is a glue object that connects Thrift RPC calls to the
- * HBase client API primarily defined in the HBaseAdmin and HTable objects.
- */
- public static class HBaseHandler implements Hbase.Iface {
- protected Configuration conf;
- protected HBaseAdmin admin = null;
- protected final Log LOG = LogFactory.getLog(this.getClass().getName());
-
- // nextScannerId and scannerMap are used to manage scanner state
- protected int nextScannerId = 0;
- protected HashMap<Integer, ResultScanner> scannerMap = null;
-
- private static ThreadLocal<Map<String, HTable>> threadLocalTables = new ThreadLocal<Map<String, HTable>>() {
- @Override
- protected Map<String, HTable> initialValue() {
- return new TreeMap<String, HTable>();
- }
- };
-
- /**
- * Returns a list of all the column families for a given htable.
- *
- * @param table
- * @return
- * @throws IOException
- */
- byte[][] getAllColumns(HTable table) throws IOException {
- HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
- byte[][] columns = new byte[cds.length][];
- for (int i = 0; i < cds.length; i++) {
- columns[i] = Bytes.add(cds[i].getName(),
- KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
- }
- return columns;
- }
-
- /**
- * Creates and returns an HTable instance from a given table name.
- *
- * @param tableName
- * name of table
- * @return HTable object
- * @throws IOException
- * @throws IOError
- */
- protected HTable getTable(final byte[] tableName) throws
- IOException {
- String table = new String(tableName);
- Map<String, HTable> tables = threadLocalTables.get();
- if (!tables.containsKey(table)) {
- tables.put(table, new HTable(conf, tableName));
- }
- return tables.get(table);
- }
-
- protected HTable getTable(final ByteBuffer tableName) throws IOException {
- return getTable(getBytes(tableName));
- }
-
- /**
- * Assigns a unique ID to the scanner and adds the mapping to an internal
- * hash-map.
- *
- * @param scanner
- * @return integer scanner id
- */
- protected synchronized int addScanner(ResultScanner scanner) {
- int id = nextScannerId++;
- scannerMap.put(id, scanner);
- return id;
- }
-
- /**
- * Returns the scanner associated with the specified ID.
- *
- * @param id
- * @return a Scanner, or null if ID was invalid.
- */
- protected synchronized ResultScanner getScanner(int id) {
- return scannerMap.get(id);
- }
-
- /**
- * Removes the scanner associated with the specified ID from the internal
- * id->scanner hash-map.
- *
- * @param id
- * @return a Scanner, or null if ID was invalid.
- */
- protected synchronized ResultScanner removeScanner(int id) {
- return scannerMap.remove(id);
- }
-
- /**
- * Constructs an HBaseHandler object.
- * @throws IOException
- */
- protected HBaseHandler()
- throws IOException {
- this(HBaseConfiguration.create());
- }
-
- protected HBaseHandler(final Configuration c)
- throws IOException {
- this.conf = c;
- admin = new HBaseAdmin(conf);
- scannerMap = new HashMap<Integer, ResultScanner>();
- }
-
- @Override
- public void enableTable(ByteBuffer tableName) throws IOError {
- try{
- admin.enableTable(getBytes(tableName));
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public void disableTable(ByteBuffer tableName) throws IOError{
- try{
- admin.disableTable(getBytes(tableName));
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
- try {
- return HTable.isTableEnabled(this.conf, getBytes(tableName));
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
- try{
- admin.compact(getBytes(tableNameOrRegionName));
- } catch (InterruptedException e) {
- throw new IOError(e.getMessage());
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
- try{
- admin.majorCompact(getBytes(tableNameOrRegionName));
- } catch (InterruptedException e) {
- throw new IOError(e.getMessage());
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public List<ByteBuffer> getTableNames() throws IOError {
- try {
- HTableDescriptor[] tables = this.admin.listTables();
- ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tables.length);
- for (int i = 0; i < tables.length; i++) {
- list.add(ByteBuffer.wrap(tables[i].getName()));
- }
- return list;
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
- throws IOError {
- try{
- List<HRegionInfo> hris = this.admin.getTableRegions(tableName.array());
- List<TRegionInfo> regions = new ArrayList<TRegionInfo>();
-
- if (hris != null) {
- for (HRegionInfo regionInfo : hris){
- TRegionInfo region = new TRegionInfo();
- region.startKey = ByteBuffer.wrap(regionInfo.getStartKey());
- region.endKey = ByteBuffer.wrap(regionInfo.getEndKey());
- region.id = regionInfo.getRegionId();
- region.name = ByteBuffer.wrap(regionInfo.getRegionName());
- region.version = regionInfo.getVersion();
- regions.add(region);
- }
- }
- return regions;
- } catch (IOException e){
- throw new IOError(e.getMessage());
- }
- }
-
- @Deprecated
- @Override
- public List<TCell> get(ByteBuffer tableName, ByteBuffer row, ByteBuffer column)
- throws IOError {
- byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
- if(famAndQf.length == 1) {
- return get(tableName, row, famAndQf[0], new byte[0]);
- }
- return get(tableName, row, famAndQf[0], famAndQf[1]);
- }
-
- protected List<TCell> get(ByteBuffer tableName,
- ByteBuffer row,
- byte[] family,
- byte[] qualifier) throws IOError {
- try {
- HTable table = getTable(tableName);
- Get get = new Get(getBytes(row));
- if (qualifier == null || qualifier.length == 0) {
- get.addFamily(family);
- } else {
- get.addColumn(family, qualifier);
- }
- Result result = table.get(get);
- return ThriftUtilities.cellFromHBase(result.raw());
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Deprecated
- @Override
- public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row,
- ByteBuffer column, int numVersions) throws IOError {
- byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
- if(famAndQf.length == 1) {
- return getVer(tableName, row, famAndQf[0],
- new byte[0], numVersions);
- }
- return getVer(tableName, row,
- famAndQf[0], famAndQf[1], numVersions);
- }
-
- public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row,
- byte[] family,
- byte[] qualifier, int numVersions) throws IOError {
- try {
- HTable table = getTable(tableName);
- Get get = new Get(getBytes(row));
- get.addColumn(family, qualifier);
- get.setMaxVersions(numVersions);
- Result result = table.get(get);
- return ThriftUtilities.cellFromHBase(result.raw());
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Deprecated
- @Override
- public List<TCell> getVerTs(ByteBuffer tableName,
- ByteBuffer row,
- ByteBuffer column,
- long timestamp,
- int numVersions) throws IOError {
- byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
- if(famAndQf.length == 1) {
- return getVerTs(tableName, row, famAndQf[0], new byte[0], timestamp,
- numVersions);
- }
- return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp,
- numVersions);
- }
-
- protected List<TCell> getVerTs(ByteBuffer tableName,
- ByteBuffer row, byte [] family,
- byte [] qualifier, long timestamp, int numVersions) throws IOError {
- try {
- HTable table = getTable(tableName);
- Get get = new Get(getBytes(row));
- get.addColumn(family, qualifier);
- get.setTimeRange(Long.MIN_VALUE, timestamp);
- get.setMaxVersions(numVersions);
- Result result = table.get(get);
- return ThriftUtilities.cellFromHBase(result.raw());
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row)
- throws IOError {
- return getRowWithColumnsTs(tableName, row, null,
- HConstants.LATEST_TIMESTAMP);
- }
-
- @Override
- public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
- ByteBuffer row,
- List<ByteBuffer> columns) throws IOError {
- return getRowWithColumnsTs(tableName, row, columns,
- HConstants.LATEST_TIMESTAMP);
- }
-
- @Override
- public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
- long timestamp) throws IOError {
- return getRowWithColumnsTs(tableName, row, null,
- timestamp);
- }
-
- @Override
- public List<TRowResult> getRowWithColumnsTs(ByteBuffer tableName, ByteBuffer row,
- List<ByteBuffer> columns, long timestamp) throws IOError {
- try {
- HTable table = getTable(tableName);
- if (columns == null) {
- Get get = new Get(getBytes(row));
- get.setTimeRange(Long.MIN_VALUE, timestamp);
- Result result = table.get(get);
- return ThriftUtilities.rowResultFromHBase(result);
- }
- Get get = new Get(getBytes(row));
- for(ByteBuffer column : columns) {
- byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
- if (famAndQf.length == 1) {
- get.addFamily(famAndQf[0]);
- } else {
- get.addColumn(famAndQf[0], famAndQf[1]);
- }
- }
- get.setTimeRange(Long.MIN_VALUE, timestamp);
- Result result = table.get(get);
- return ThriftUtilities.rowResultFromHBase(result);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public List<TRowResult> getRows(ByteBuffer tableName,
- List<ByteBuffer> rows)
- throws IOError {
- return getRowsWithColumnsTs(tableName, rows, null,
- HConstants.LATEST_TIMESTAMP);
- }
-
- @Override
- public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
- List<ByteBuffer> rows,
- List<ByteBuffer> columns) throws IOError {
- return getRowsWithColumnsTs(tableName, rows, columns,
- HConstants.LATEST_TIMESTAMP);
- }
-
- @Override
- public List<TRowResult> getRowsTs(ByteBuffer tableName,
- List<ByteBuffer> rows,
- long timestamp) throws IOError {
- return getRowsWithColumnsTs(tableName, rows, null,
- timestamp);
- }
-
- @Override
- public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
- List<ByteBuffer> rows,
- List<ByteBuffer> columns, long timestamp) throws IOError {
- try {
- List<Get> gets = new ArrayList<Get>(rows.size());
- HTable table = getTable(tableName);
- for (ByteBuffer row : rows) {
- Get get = new Get(getBytes(row));
- if (columns != null) {
-
- for(ByteBuffer column : columns) {
- byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
- if (famAndQf.length == 1) {
- get.addFamily(famAndQf[0]);
- } else {
- get.addColumn(famAndQf[0], famAndQf[1]);
- }
- }
- get.setTimeRange(Long.MIN_VALUE, timestamp);
- }
- gets.add(get);
- }
- Result[] result = table.get(gets);
- return ThriftUtilities.rowResultFromHBase(result);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public void deleteAll(ByteBuffer tableName, ByteBuffer row, ByteBuffer column)
- throws IOError {
- deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP);
- }
-
- @Override
- public void deleteAllTs(ByteBuffer tableName,
- ByteBuffer row,
- ByteBuffer column,
- long timestamp) throws IOError {
- try {
- HTable table = getTable(tableName);
- Delete delete = new Delete(getBytes(row));
- byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
- if (famAndQf.length == 1) {
- delete.deleteFamily(famAndQf[0], timestamp);
- } else {
- delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
- }
- table.delete(delete);
+ ThriftServerRunner serverRunner;
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public void deleteAllRow(ByteBuffer tableName, ByteBuffer row) throws IOError {
- deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP);
- }
-
- @Override
- public void deleteAllRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp)
- throws IOError {
- try {
- HTable table = getTable(tableName);
- Delete delete = new Delete(getBytes(row), timestamp, null);
- table.delete(delete);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public void createTable(ByteBuffer in_tableName,
- List<ColumnDescriptor> columnFamilies) throws IOError,
- IllegalArgument, AlreadyExists {
- byte [] tableName = getBytes(in_tableName);
- try {
- if (admin.tableExists(tableName)) {
- throw new AlreadyExists("table name already in use");
- }
- HTableDescriptor desc = new HTableDescriptor(tableName);
- for (ColumnDescriptor col : columnFamilies) {
- HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
- desc.addFamily(colDesc);
- }
- admin.createTable(desc);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- } catch (IllegalArgumentException e) {
- throw new IllegalArgument(e.getMessage());
- }
- }
-
- @Override
- public void deleteTable(ByteBuffer in_tableName) throws IOError {
- byte [] tableName = getBytes(in_tableName);
- if (LOG.isDebugEnabled()) {
- LOG.debug("deleteTable: table=" + Bytes.toString(tableName));
- }
- try {
- if (!admin.tableExists(tableName)) {
- throw new IOError("table does not exist");
- }
- admin.deleteTable(tableName);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public void mutateRow(ByteBuffer tableName, ByteBuffer row,
- List<Mutation> mutations) throws IOError, IllegalArgument {
- mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP);
- }
-
- @Override
- public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
- List<Mutation> mutations, long timestamp) throws IOError, IllegalArgument {
- HTable table = null;
- try {
- table = getTable(tableName);
- Put put = new Put(getBytes(row), timestamp, null);
-
- Delete delete = new Delete(getBytes(row));
-
- // I apologize for all this mess :)
- for (Mutation m : mutations) {
- byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
- if (m.isDelete) {
- if (famAndQf.length == 1) {
- delete.deleteFamily(famAndQf[0], timestamp);
- } else {
- delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
- }
- } else {
- if(famAndQf.length == 1) {
- put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY,
- m.value != null ? m.value.array()
- : HConstants.EMPTY_BYTE_ARRAY);
- } else {
- put.add(famAndQf[0], famAndQf[1],
- m.value != null ? m.value.array()
- : HConstants.EMPTY_BYTE_ARRAY);
- }
- }
- }
- if (!delete.isEmpty())
- table.delete(delete);
- if (!put.isEmpty())
- table.put(put);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- } catch (IllegalArgumentException e) {
- throw new IllegalArgument(e.getMessage());
- }
- }
-
- @Override
- public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches)
- throws IOError, IllegalArgument, TException {
- mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP);
- }
-
- @Override
- public void mutateRowsTs(ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp)
- throws IOError, IllegalArgument, TException {
- List<Put> puts = new ArrayList<Put>();
- List<Delete> deletes = new ArrayList<Delete>();
-
- for (BatchMutation batch : rowBatches) {
- byte[] row = getBytes(batch.row);
- List<Mutation> mutations = batch.mutations;
- Delete delete = new Delete(row);
- Put put = new Put(row, timestamp, null);
- for (Mutation m : mutations) {
- byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
- if (m.isDelete) {
- // no qualifier, family only.
- if (famAndQf.length == 1) {
- delete.deleteFamily(famAndQf[0], timestamp);
- } else {
- delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
- }
- } else {
- if(famAndQf.length == 1) {
- put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY,
- m.value != null ? m.value.array()
- : HConstants.EMPTY_BYTE_ARRAY);
- } else {
- put.add(famAndQf[0], famAndQf[1],
- m.value != null ? m.value.array()
- : HConstants.EMPTY_BYTE_ARRAY);
- }
- }
- }
- if (!delete.isEmpty())
- deletes.add(delete);
- if (!put.isEmpty())
- puts.add(put);
- }
-
- HTable table = null;
- try {
- table = getTable(tableName);
- if (!puts.isEmpty())
- table.put(puts);
- for (Delete del : deletes) {
- table.delete(del);
- }
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- } catch (IllegalArgumentException e) {
- throw new IllegalArgument(e.getMessage());
- }
- }
-
- @Deprecated
- @Override
- public long atomicIncrement(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
- long amount) throws IOError, IllegalArgument, TException {
- byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
- if(famAndQf.length == 1) {
- return atomicIncrement(tableName, row, famAndQf[0], new byte[0],
- amount);
- }
- return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
- }
-
- protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row, byte [] family,
- byte [] qualifier, long amount)
- throws IOError, IllegalArgument, TException {
- HTable table;
- try {
- table = getTable(tableName);
- return table.incrementColumnValue(getBytes(row), family, qualifier, amount);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public void scannerClose(int id) throws IOError, IllegalArgument {
- LOG.debug("scannerClose: id=" + id);
- ResultScanner scanner = getScanner(id);
- if (scanner == null) {
- throw new IllegalArgument("scanner ID is invalid");
- }
- scanner.close();
- removeScanner(id);
- }
-
- @Override
- public List<TRowResult> scannerGetList(int id,int nbRows) throws IllegalArgument, IOError {
- LOG.debug("scannerGetList: id=" + id);
- ResultScanner scanner = getScanner(id);
- if (null == scanner) {
- throw new IllegalArgument("scanner ID is invalid");
- }
-
- Result [] results = null;
- try {
- results = scanner.next(nbRows);
- if (null == results) {
- return new ArrayList<TRowResult>();
- }
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- return ThriftUtilities.rowResultFromHBase(results);
- }
-
- @Override
- public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
- return scannerGetList(id,1);
- }
-
- public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan) throws IOError {
- try {
- HTable table = getTable(tableName);
- Scan scan = new Scan();
- if (tScan.isSetStartRow()) {
- scan.setStartRow(tScan.getStartRow());
- }
- if (tScan.isSetStopRow()) {
- scan.setStopRow(tScan.getStopRow());
- }
- if (tScan.isSetTimestamp()) {
- scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp());
- }
- if (tScan.isSetCaching()) {
- scan.setCaching(tScan.getCaching());
- }
- if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
- for(ByteBuffer column : tScan.getColumns()) {
- byte [][] famQf = KeyValue.parseColumn(getBytes(column));
- if(famQf.length == 1) {
- scan.addFamily(famQf[0]);
- } else {
- scan.addColumn(famQf[0], famQf[1]);
- }
- }
- }
- if (tScan.isSetFilterString()) {
- ParseFilter parseFilter = new ParseFilter();
- scan.setFilter(parseFilter.parseFilterString(tScan.getFilterString()));
- }
- return addScanner(table.getScanner(scan));
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
- List<ByteBuffer> columns) throws IOError {
- try {
- HTable table = getTable(tableName);
- Scan scan = new Scan(getBytes(startRow));
- if(columns != null && columns.size() != 0) {
- for(ByteBuffer column : columns) {
- byte [][] famQf = KeyValue.parseColumn(getBytes(column));
- if(famQf.length == 1) {
- scan.addFamily(famQf[0]);
- } else {
- scan.addColumn(famQf[0], famQf[1]);
- }
- }
- }
- return addScanner(table.getScanner(scan));
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
- ByteBuffer stopRow, List<ByteBuffer> columns) throws IOError, TException {
- try {
- HTable table = getTable(tableName);
- Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
- if(columns != null && columns.size() != 0) {
- for(ByteBuffer column : columns) {
- byte [][] famQf = KeyValue.parseColumn(getBytes(column));
- if(famQf.length == 1) {
- scan.addFamily(famQf[0]);
- } else {
- scan.addColumn(famQf[0], famQf[1]);
- }
- }
- }
- return addScanner(table.getScanner(scan));
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public int scannerOpenWithPrefix(ByteBuffer tableName,
- ByteBuffer startAndPrefix,
- List<ByteBuffer> columns)
- throws IOError, TException {
- try {
- HTable table = getTable(tableName);
- Scan scan = new Scan(getBytes(startAndPrefix));
- Filter f = new WhileMatchFilter(
- new PrefixFilter(getBytes(startAndPrefix)));
- scan.setFilter(f);
- if (columns != null && columns.size() != 0) {
- for(ByteBuffer column : columns) {
- byte [][] famQf = KeyValue.parseColumn(getBytes(column));
- if(famQf.length == 1) {
- scan.addFamily(famQf[0]);
- } else {
- scan.addColumn(famQf[0], famQf[1]);
- }
- }
- }
- return addScanner(table.getScanner(scan));
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
- List<ByteBuffer> columns, long timestamp) throws IOError, TException {
- try {
- HTable table = getTable(tableName);
- Scan scan = new Scan(getBytes(startRow));
- scan.setTimeRange(Long.MIN_VALUE, timestamp);
- if (columns != null && columns.size() != 0) {
- for (ByteBuffer column : columns) {
- byte [][] famQf = KeyValue.parseColumn(getBytes(column));
- if(famQf.length == 1) {
- scan.addFamily(famQf[0]);
- } else {
- scan.addColumn(famQf[0], famQf[1]);
- }
- }
- }
- return addScanner(table.getScanner(scan));
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
- ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp)
- throws IOError, TException {
- try {
- HTable table = getTable(tableName);
- Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
- scan.setTimeRange(Long.MIN_VALUE, timestamp);
- if (columns != null && columns.size() != 0) {
- for (ByteBuffer column : columns) {
- byte [][] famQf = KeyValue.parseColumn(getBytes(column));
- if(famQf.length == 1) {
- scan.addFamily(famQf[0]);
- } else {
- scan.addColumn(famQf[0], famQf[1]);
- }
- }
- }
- scan.setTimeRange(Long.MIN_VALUE, timestamp);
- return addScanner(table.getScanner(scan));
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
- ByteBuffer tableName) throws IOError, TException {
- try {
- TreeMap<ByteBuffer, ColumnDescriptor> columns =
- new TreeMap<ByteBuffer, ColumnDescriptor>();
-
- HTable table = getTable(tableName);
- HTableDescriptor desc = table.getTableDescriptor();
-
- for (HColumnDescriptor e : desc.getFamilies()) {
- ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
- columns.put(col.name, col);
- }
- return columns;
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
- ByteBuffer family) throws IOError {
- try {
- HTable table = getTable(getBytes(tableName));
- Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
- return ThriftUtilities.cellFromHBase(result.raw());
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
- try {
- HTable table = getTable(HConstants.META_TABLE_NAME);
- Result startRowResult = table.getRowOrBefore(
- searchRow.array(), HConstants.CATALOG_FAMILY);
-
- if (startRowResult == null) {
- throw new IOException("Cannot find row in .META., row="
- + Bytes.toString(searchRow.array()));
- }
-
- // find region start and end keys
- byte[] value = startRowResult.getValue(HConstants.CATALOG_FAMILY,
- HConstants.REGIONINFO_QUALIFIER);
- if (value == null || value.length == 0) {
- throw new IOException("HRegionInfo REGIONINFO was null or " +
- " empty in Meta for row="
- + Bytes.toString(searchRow.array()));
- }
- HRegionInfo regionInfo = Writables.getHRegionInfo(value);
- TRegionInfo region = new TRegionInfo();
- region.setStartKey(regionInfo.getStartKey());
- region.setEndKey(regionInfo.getEndKey());
- region.id = regionInfo.getRegionId();
- region.setName(regionInfo.getRegionName());
- region.version = regionInfo.getVersion();
-
- // find region assignment to server
- value = startRowResult.getValue(HConstants.CATALOG_FAMILY,
- HConstants.SERVER_QUALIFIER);
- if (value != null && value.length > 0) {
- String hostAndPort = Bytes.toString(value);
- region.setServerName(Bytes.toBytes(
- Addressing.parseHostname(hostAndPort)));
- region.port = Addressing.parsePort(hostAndPort);
- }
- return region;
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
- }
+ //
+ // Main program and support routines
+ //
public ThriftServer(Configuration conf) {
this.conf = HBaseConfiguration.create(conf);
}
- //
- // Main program and support routines
- //
-
private static void printUsageAndExit(Options options, int exitCode)
throws ExitCodeException {
HelpFormatter formatter = new HelpFormatter();
@@ -1092,11 +77,20 @@ public class ThriftServer {
throw new ExitCodeException(exitCode, "");
}
- /*
+ /**
* Start up or shuts down the Thrift server, depending on the arguments.
* @param args
*/
void doMain(final String[] args) throws Exception {
+ processOptions(args);
+ serverRunner = new ThriftServerRunner(conf);
+ serverRunner.run();
+ }
+
+ /**
+ * Parse the command line options to set parameters the conf.
+ */
+ private void processOptions(final String[] args) throws Exception {
Options options = new Options();
options.addOption("b", BIND_OPTION, true, "Address to bind " +
"the Thrift server to. Not supported by the Nonblocking and " +
@@ -1143,10 +137,10 @@ public class ThriftServer {
}
// Get port to bind to
- int listenPort = 0;
try {
- listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION,
+ int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION,
String.valueOf(DEFAULT_LISTEN_PORT)));
+ conf.setInt(ThriftServerRunner.PORT_CONF_KEY, listenPort);
} catch (NumberFormatException e) {
LOG.error("Could not parse the value provided for the port option", e);
printUsageAndExit(options, -1);
@@ -1162,131 +156,29 @@ public class ThriftServer {
optionToConf(cmd, KEEP_ALIVE_SEC_OPTION,
conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY);
- // Construct correct ProtocolFactory
- TProtocolFactory protocolFactory;
- if (cmd.hasOption(COMPACT_OPTION)) {
- LOG.debug("Using compact protocol");
- protocolFactory = new TCompactProtocol.Factory();
- } else {
- LOG.debug("Using binary protocol");
- protocolFactory = new TBinaryProtocol.Factory();
- }
-
- HBaseHandler handler = new HBaseHandler(conf);
- Hbase.Processor<Hbase.Iface> processor =
- new Hbase.Processor<Hbase.Iface>(handler);
- ImplType implType = ImplType.getServerImpl(cmd);
-
- // Construct correct TransportFactory
- TTransportFactory transportFactory;
- if (cmd.hasOption(FRAMED_OPTION) || implType.isAlwaysFramed) {
- transportFactory = new TFramedTransport.Factory();
- LOG.debug("Using framed transport");
- } else {
- transportFactory = new TTransportFactory();
- }
-
- if (cmd.hasOption(BIND_OPTION) && !implType.canSpecifyBindIP) {
- LOG.error("Server types " + Joiner.on(", ").join(
- ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
- "address binding at the moment. See " +
- "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
- printUsageAndExit(options, -1);
- }
-
- if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING) {
- if (cmd.hasOption(BIND_OPTION)) {
- throw new RuntimeException("-" + BIND_OPTION + " not supported with " +
- implType);
- }
-
- TNonblockingServerTransport serverTransport =
- new TNonblockingServerSocket(listenPort);
-
- if (implType == ImplType.NONBLOCKING) {
- TNonblockingServer.Args serverArgs =
- new TNonblockingServer.Args(serverTransport);
- setServerArgs(serverArgs, processor, transportFactory,
- protocolFactory);
- server = new TNonblockingServer(serverArgs);
- } else {
- THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
- serverArgs.processor(processor);
- serverArgs.transportFactory(transportFactory);
- serverArgs.protocolFactory(protocolFactory);
- server = new THsHaServer(serverArgs);
- }
- LOG.info("starting HBase " + implType.simpleClassName() +
- " server on " + Integer.toString(listenPort));
- } else if (implType == ImplType.THREAD_POOL) {
- // Thread pool server. Get the IP address to bind to.
- InetAddress listenAddress = getBindAddress(options, cmd);
-
- TServerTransport serverTransport = new TServerSocket(
- new InetSocketAddress(listenAddress, listenPort));
-
- TBoundedThreadPoolServer.Args serverArgs = new TBoundedThreadPoolServer.Args(
- serverTransport, conf);
- setServerArgs(serverArgs, processor, transportFactory, protocolFactory);
- LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
- + listenAddress + ":" + Integer.toString(listenPort)
- + "; " + serverArgs);
- server = new TBoundedThreadPoolServer(serverArgs);
- } else {
- throw new AssertionError("Unsupported Thrift server implementation: " +
- implType.simpleClassName());
- }
-
- // A sanity check that we instantiated the right type of server.
- if (server.getClass() != implType.serverClass) {
- throw new AssertionError("Expected to create Thrift server class " +
- implType.serverClass.getName() + " but got " +
- server.getClass().getName());
- }
-
- // login the server principal (if using secure Hadoop)
- Configuration conf = handler.conf;
- if (User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf)) {
- String machineName = Strings.domainNamePointerToHostName(
- DNS.getDefaultHost(conf.get("hbase.thrift.dns.interface", "default"),
- conf.get("hbase.thrift.dns.nameserver", "default")));
- User.login(conf, "hbase.thrift.keytab.file", "hbase.thrift.kerberos.principal",
- machineName);
+ // Set general thrift server options
+ conf.setBoolean(
+ ThriftServerRunner.COMPACT_CONF_KEY, cmd.hasOption(COMPACT_OPTION));
+ conf.setBoolean(
+ ThriftServerRunner.FRAMED_CONF_KEY, cmd.hasOption(FRAMED_OPTION));
+ if (cmd.hasOption(BIND_OPTION)) {
+ conf.set(
+ ThriftServerRunner.BIND_CONF_KEY, cmd.getOptionValue(BIND_OPTION));
}
- server.serve();
+ ImplType.setServerImpl(cmd, conf);
}
public void stop() {
- server.stop();
- }
-
- private InetAddress getBindAddress(Options options, CommandLine cmd)
- throws ExitCodeException {
- InetAddress listenAddress = null;
- String bindAddressStr = cmd.getOptionValue(BIND_OPTION, DEFAULT_BIND_ADDR);
- try {
- listenAddress = InetAddress.getByName(bindAddressStr);
- } catch (UnknownHostException e) {
- LOG.error("Could not resolve the bind address specified: " +
- bindAddressStr, e);
- printUsageAndExit(options, -1);
- }
- return listenAddress;
- }
-
- private static void setServerArgs(AbstractServerArgs<?> serverArgs,
- Processor<Iface> processor, TTransportFactory transportFactory,
- TProtocolFactory protocolFactory) {
- serverArgs.processor(processor);
- serverArgs.transportFactory(transportFactory);
- serverArgs.protocolFactory(protocolFactory);
+ serverRunner.shutdown();
}
private static void optionToConf(CommandLine cmd, String option,
Configuration conf, String destConfKey) {
if (cmd.hasOption(option)) {
- conf.set(destConfKey, cmd.getOptionValue(option));
+ String value = cmd.getOptionValue(option);
+ LOG.info("Set configuration key:" + destConfKey + " value:" + value);
+ conf.set(destConfKey, value);
}
}
@@ -1302,5 +194,4 @@ public class ThriftServer {
System.exit(ex.getExitCode());
}
}
-
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java?rev=1232504&r1=1232503&r2=1232504&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java Tue Jan 17 17:38:58 2012
@@ -40,7 +40,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
- * Unit testing for ThriftServer.HBaseHandler, a part of the
+ * Unit testing for ThriftServerRunner.HBaseHandler, a part of the
* org.apache.hadoop.hbase.thrift package.
*/
@Category(MediumTests.class)
@@ -100,8 +100,8 @@ public class TestThriftServer {
*/
@Test
public void doTestTableCreateDrop() throws Exception {
- ThriftServer.HBaseHandler handler =
- new ThriftServer.HBaseHandler(UTIL.getConfiguration());
+ ThriftServerRunner.HBaseHandler handler =
+ new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
createTestTables(handler);
dropTestTables(handler);
}
@@ -140,8 +140,8 @@ public class TestThriftServer {
*/
public void doTestTableMutations() throws Exception {
// Setup
- ThriftServer.HBaseHandler handler =
- new ThriftServer.HBaseHandler(UTIL.getConfiguration());
+ ThriftServerRunner.HBaseHandler handler =
+ new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
handler.createTable(tableAname, getColumnDescriptors());
// Apply a few Mutations to rowA
@@ -213,8 +213,8 @@ public class TestThriftServer {
*/
public void doTestTableTimestampsAndColumns() throws Exception {
// Setup
- ThriftServer.HBaseHandler handler =
- new ThriftServer.HBaseHandler(UTIL.getConfiguration());
+ ThriftServerRunner.HBaseHandler handler =
+ new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
handler.createTable(tableAname, getColumnDescriptors());
// Apply timestamped Mutations to rowA
@@ -292,8 +292,8 @@ public class TestThriftServer {
*/
public void doTestTableScanners() throws Exception {
// Setup
- ThriftServer.HBaseHandler handler =
- new ThriftServer.HBaseHandler(UTIL.getConfiguration());
+ ThriftServerRunner.HBaseHandler handler =
+ new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
handler.createTable(tableAname, getColumnDescriptors());
// Apply timestamped Mutations to rowA
@@ -360,8 +360,8 @@ public class TestThriftServer {
* @throws Exception
*/
public void doTestGetTableRegions() throws Exception {
- ThriftServer.HBaseHandler handler =
- new ThriftServer.HBaseHandler(UTIL.getConfiguration());
+ ThriftServerRunner.HBaseHandler handler =
+ new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration());
handler.createTable(tableAname, getColumnDescriptors());
int regionCount = handler.getTableRegions(tableAname).size();
assertEquals("empty table should have only 1 region, " +
@@ -456,7 +456,8 @@ public class TestThriftServer {
* @param handler the HBaseHandler interfacing to HBase
* @throws Exception
*/
- private void closeScanner(int scannerId, ThriftServer.HBaseHandler handler) throws Exception {
+ private void closeScanner(
+ int scannerId, ThriftServerRunner.HBaseHandler handler) throws Exception {
handler.scannerGet(scannerId);
handler.scannerClose(scannerId);
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java?rev=1232504&r1=1232503&r2=1232504&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java Tue Jan 17 17:38:58 2012
@@ -31,8 +31,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.LargeTests;
-import org.apache.hadoop.hbase.MediumTests;
-import org.apache.hadoop.hbase.thrift.ThriftServer.ImplType;
+import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
import org.apache.hadoop.hbase.thrift.generated.Hbase;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -82,7 +81,7 @@ public class TestThriftServerCmdLine {
@Parameters
public static Collection<Object[]> getParameters() {
Collection<Object[]> parameters = new ArrayList<Object[]>();
- for (ThriftServer.ImplType implType : ThriftServer.ImplType.values()) {
+ for (ImplType implType : ImplType.values()) {
for (boolean specifyFramed : new boolean[] {false, true}) {
for (boolean specifyBindIP : new boolean[] {false, true}) {
if (specifyBindIP && !implType.canSpecifyBindIP) {
@@ -167,6 +166,11 @@ public class TestThriftServerCmdLine {
startCmdLineThread(args.toArray(new String[0]));
Threads.sleepWithoutInterrupt(2000);
+ Class<? extends TServer> expectedClass = implType != null ?
+ implType.serverClass : TBoundedThreadPoolServer.class;
+ assertEquals(expectedClass,
+ thriftServer.serverRunner.tserver.getClass());
+
try {
talkToThriftServer();
} catch (Exception ex) {
@@ -175,14 +179,6 @@ public class TestThriftServerCmdLine {
stopCmdLineThread();
}
- Class<? extends TServer> expectedClass;
- if (implType != null) {
- expectedClass = implType.serverClass;
- } else {
- expectedClass = TBoundedThreadPoolServer.class;
- }
- assertEquals(expectedClass, thriftServer.server.getClass());
-
if (clientSideException != null) {
LOG.error("Thrift client threw an exception", clientSideException);
throw new Exception(clientSideException);