You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/04/21 03:37:16 UTC
svn commit: r1328562 [2/9] - in /hbase/branches/0.89-fb: ./
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/filter/
src/main/java/org/apache/hadoop/hbase/regionserver/
src/main/java/org/apache/hadoop/hbase/thrift/ src/main/...
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java?rev=1328562&r1=1328561&r2=1328562&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java Sat Apr 21 01:37:15 2012
@@ -18,1207 +18,167 @@
package org.apache.hadoop.hbase.thrift;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.ParseFilter;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.filter.WhileMatchFilter;
-import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
-import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
-import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
-import org.apache.hadoop.hbase.thrift.generated.Hbase;
-import org.apache.hadoop.hbase.thrift.generated.IOError;
-import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
-import org.apache.hadoop.hbase.thrift.generated.Mutation;
-import org.apache.hadoop.hbase.thrift.generated.TCell;
-import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
-import org.apache.hadoop.hbase.thrift.generated.TRowResult;
-import org.apache.hadoop.hbase.thrift.generated.TScan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.THsHaServer;
-import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TTransportFactory;
+import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+
+import com.google.common.base.Joiner;
/**
- * ThriftServer - this class starts up a Thrift server which implements the
- * Hbase API specified in the Hbase.thrift IDL file.
+ * ThriftServer- this class starts up a Thrift server which implements the
+ * Hbase API specified in the Hbase.thrift IDL file. The server runs in an
+ * independent process.
*/
public class ThriftServer {
- public static final Class<? extends TServer>
- THREAD_POOL_SERVER_CLASS = TBoundedThreadPoolServer.class;
-
- /**
- * The HBaseHandler is a glue object that connects Thrift RPC calls to the
- * HBase client API primarily defined in the HBaseAdmin and HTable objects.
- */
- public static class HBaseHandler implements Hbase.Iface {
- protected Configuration conf;
- protected HBaseAdmin admin = null;
- protected final Log LOG = LogFactory.getLog(this.getClass().getName());
-
- // nextScannerId and scannerMap are used to manage scanner state
- protected int nextScannerId = 0;
- protected HashMap<Integer, ResultScanner> scannerMap = null;
-
- final private ThriftMetrics metrics;
-
- private static ThreadLocal<Map<String, HTable>> threadLocalTables = new ThreadLocal<Map<String, HTable>>() {
- @Override
- protected Map<String, HTable> initialValue() {
- return new TreeMap<String, HTable>();
- }
-
- };
-
- /**
- * Returns a list of all the column families for a given htable.
- *
- * @param table
- * @return
- * @throws IOException
- */
- byte[][] getAllColumns(HTable table) throws IOException {
- HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
- byte[][] columns = new byte[cds.length][];
- for (int i = 0; i < cds.length; i++) {
- columns[i] = Bytes.add(cds[i].getName(),
- KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
- }
- return columns;
- }
-
- /**
- * Creates and returns an HTable instance from a given table name.
- *
- * @param tableName
- * name of table
- * @return HTable object
- * @throws IOException
- * @throws IOError
- */
- protected HTable getTable(final byte[] tableName) throws IOError,
- IOException {
- String table = new String(tableName);
- Map<String, HTable> tables = threadLocalTables.get();
- if (!tables.containsKey(table)) {
- tables.put(table, new HTable(conf, tableName));
- }
- return tables.get(table);
- }
-
- /**
- * Assigns a unique ID to the scanner and adds the mapping to an internal
- * hash-map.
- *
- * @param scanner
- * @return integer scanner id
- */
- protected synchronized int addScanner(ResultScanner scanner) {
- int id = nextScannerId++;
- scannerMap.put(id, scanner);
- return id;
- }
-
- /**
- * Returns the scanner associated with the specified ID.
- *
- * @param id
- * @return a Scanner, or null if ID was invalid.
- */
- protected synchronized ResultScanner getScanner(int id) {
- return scannerMap.get(id);
- }
-
- /**
- * Removes the scanner associated with the specified ID from the internal
- * id->scanner hash-map.
- *
- * @param id
- * @return a Scanner, or null if ID was invalid.
- */
- protected synchronized ResultScanner removeScanner(int id) {
- return scannerMap.remove(id);
- }
-
- /** Constructs a handler with configuration based on the given one. */
- protected HBaseHandler(
- HBaseConfiguration conf, ThriftMetrics metrics) throws IOException {
- this(HBaseConfiguration.create(conf), metrics);
- LOG.debug("Creating HBaseHandler with ZK client port " +
- conf.get(HConstants.ZOOKEEPER_CLIENT_PORT));
- }
-
- protected HBaseHandler(
- final Configuration c, ThriftMetrics metrics) throws IOException {
- this.metrics = metrics;
- this.conf = c;
- admin = new HBaseAdmin(conf);
- scannerMap = new HashMap<Integer, ResultScanner>();
- }
-
- /** Create a handler without metrics. Used by unit test only */
- protected HBaseHandler(final Configuration c) throws IOException {
- this(c, null);
- }
-
- public void enableTable(final byte[] tableName) throws IOError {
- try{
- admin.enableTable(tableName);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public void disableTable(final byte[] tableName) throws IOError{
- try{
- admin.disableTable(tableName);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public boolean isTableEnabled(final byte[] tableName) throws IOError {
- try {
- return HTable.isTableEnabled(conf, tableName);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public void compact(byte[] tableNameOrRegionName) throws IOError {
- try{
- admin.compact(tableNameOrRegionName);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public void majorCompact(byte[] tableNameOrRegionName) throws IOError {
- try{
- admin.majorCompact(tableNameOrRegionName);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public List<byte[]> getTableNames() throws IOError {
- try {
- HTableDescriptor[] tables = this.admin.listTables();
- ArrayList<byte[]> list = new ArrayList<byte[]>(tables.length);
- for (int i = 0; i < tables.length; i++) {
- list.add(tables[i].getName());
- }
- return list;
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public List<TRegionInfo> getTableRegions(byte[] tableName)
- throws IOError {
- try{
- HTable table = getTable(tableName);
- Map<HRegionInfo, HServerAddress> regionsInfo = table.getRegionsInfo();
- List<TRegionInfo> regions = new ArrayList<TRegionInfo>();
-
- for (HRegionInfo regionInfo : regionsInfo.keySet()){
- TRegionInfo region = new TRegionInfo();
- region.startKey = regionInfo.getStartKey();
- region.endKey = regionInfo.getEndKey();
- region.id = regionInfo.getRegionId();
- region.name = regionInfo.getRegionName();
- region.version = regionInfo.getVersion();
- HServerAddress server = regionsInfo.get(regionInfo);
- if (server != null) {
- byte[] hostname = Bytes.toBytes(server.getHostname());
- region.serverName = hostname;
- region.port = server.getPort();
- }
- regions.add(region);
- }
- return regions;
- } catch (IOException e){
- throw new IOError(e.getMessage());
- }
- }
-
- @Deprecated
- public List<TCell> get(byte[] tableName, byte[] row, byte[] column)
- throws IOError {
- byte [][] famAndQf = KeyValue.parseColumn(column);
- if(famAndQf.length == 1) {
- return get(tableName, row, famAndQf[0], new byte[0]);
- }
- return get(tableName, row, famAndQf[0], famAndQf[1]);
- }
-
- public List<TCell> get(byte [] tableName, byte [] row, byte [] family,
- byte [] qualifier) throws IOError {
- try {
- HTable table = getTable(tableName);
- Get get = new Get(row);
- if (qualifier == null || qualifier.length == 0) {
- get.addFamily(family);
- } else {
- get.addColumn(family, qualifier);
- }
- Result result = table.get(get);
- return ThriftUtilities.cellFromHBase(result.sorted());
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Deprecated
- public List<TCell> getVer(byte[] tableName, byte[] row,
- byte[] column, int numVersions) throws IOError {
- byte [][] famAndQf = KeyValue.parseColumn(column);
- if(famAndQf.length == 1) {
- return getVer(tableName, row, famAndQf[0], new byte[0], numVersions);
- }
- return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions);
- }
-
- public List<TCell> getVer(byte [] tableName, byte [] row, byte [] family,
- byte [] qualifier, int numVersions) throws IOError {
- try {
- HTable table = getTable(tableName);
- Get get = new Get(row);
- get.addColumn(family, qualifier);
- get.setMaxVersions(numVersions);
- Result result = table.get(get);
- return ThriftUtilities.cellFromHBase(result.sorted());
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Deprecated
- public List<TCell> getVerTs(byte[] tableName, byte[] row,
- byte[] column, long timestamp, int numVersions) throws IOError {
- byte [][] famAndQf = KeyValue.parseColumn(column);
- if(famAndQf.length == 1) {
- return getVerTs(tableName, row, famAndQf[0], new byte[0], timestamp,
- numVersions);
- }
- return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp,
- numVersions);
- }
-
- public List<TCell> getVerTs(byte [] tableName, byte [] row, byte [] family,
- byte [] qualifier, long timestamp, int numVersions) throws IOError {
- try {
- HTable table = getTable(tableName);
- Get get = new Get(row);
- get.addColumn(family, qualifier);
- get.setTimeRange(Long.MIN_VALUE, timestamp);
- get.setMaxVersions(numVersions);
- Result result = table.get(get);
- return ThriftUtilities.cellFromHBase(result.sorted());
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public List<TRowResult> getRow(byte[] tableName, byte[] row)
- throws IOError {
- return getRowWithColumnsTs(tableName, row, null,
- HConstants.LATEST_TIMESTAMP);
- }
-
- public List<TRowResult> getRowWithColumns(byte[] tableName, byte[] row,
- List<byte[]> columns) throws IOError {
- return getRowWithColumnsTs(tableName, row, columns,
- HConstants.LATEST_TIMESTAMP);
- }
-
- public List<TRowResult> getRowTs(byte[] tableName, byte[] row,
- long timestamp) throws IOError {
- return getRowWithColumnsTs(tableName, row, null,
- timestamp);
- }
-
- public List<TRowResult> getRowWithColumnsTs(byte[] tableName, byte[] row,
- List<byte[]> columns, long timestamp) throws IOError {
- try {
- HTable table = getTable(tableName);
- if (columns == null) {
- Get get = new Get(row);
- get.setTimeRange(Long.MIN_VALUE, timestamp);
- Result result = table.get(get);
- return ThriftUtilities.rowResultFromHBase(result);
- }
- byte[][] columnArr = columns.toArray(new byte[columns.size()][]);
- Get get = new Get(row);
- for(byte [] column : columnArr) {
- byte [][] famAndQf = KeyValue.parseColumn(column);
- if (famAndQf.length == 1) {
- get.addFamily(famAndQf[0]);
- } else {
- get.addColumn(famAndQf[0], famAndQf[1]);
- }
- }
- get.setTimeRange(Long.MIN_VALUE, timestamp);
- Result result = table.get(get);
- return ThriftUtilities.rowResultFromHBase(result);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public List<TRowResult> getRowWithColumnPrefix(byte[] tableName,
- byte[] row, byte[] prefix) throws IOError {
- return (getRowWithColumnPrefixTs(tableName, row, prefix,
- HConstants.LATEST_TIMESTAMP));
- }
-
- @Override
- public List<TRowResult> getRowWithColumnPrefixTs(byte[] tableName,
- byte[] row, byte[] prefix, long timestamp) throws IOError {
- try {
- HTable table = getTable(tableName);
- if (prefix == null) {
- Get get = new Get(row);
- get.setTimeRange(Long.MIN_VALUE, timestamp);
- Result result = table.get(get);
- return ThriftUtilities.rowResultFromHBase(result);
- }
- Get get = new Get(row);
- byte [][] famAndPrefix = KeyValue.parseColumn(prefix);
- if (famAndPrefix.length == 2) {
- get.addFamily(famAndPrefix[0]);
- get.setFilter(new ColumnPrefixFilter(famAndPrefix[1]));
- } else {
- get.setFilter(new ColumnPrefixFilter(famAndPrefix[0]));
- }
- get.setTimeRange(Long.MIN_VALUE, timestamp);
- Result result = table.get(get);
- return ThriftUtilities.rowResultFromHBase(result);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public void deleteAll(byte[] tableName, byte[] row, byte[] column)
- throws IOError {
- deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP);
- }
-
- @Override
- public List<TRowResult> getRows(byte[] tableName, List<byte[]> rows)
- throws IOError {
- return getRowsWithColumnsTs(tableName, rows, null,
- HConstants.LATEST_TIMESTAMP);
- }
-
- @Override
- public List<TRowResult> getRowsWithColumns(byte[] tableName,
- List<byte[]> rows, List<byte[]> columns) throws IOError {
- return getRowsWithColumnsTs(tableName, rows, columns,
- HConstants.LATEST_TIMESTAMP);
- }
-
- @Override
- public List<TRowResult> getRowsTs(byte[] tableName, List<byte[]> rows,
- long timestamp) throws IOError {
- return getRowsWithColumnsTs(tableName, rows, null, timestamp);
- }
-
- @Override
- public List<TRowResult> getRowsWithColumnsTs(byte[] tableName,
- List<byte[]> rows, List<byte[]> columns, long timestamp) throws IOError {
- try {
- List<Get> gets = new ArrayList<Get>(rows.size());
- HTable table = getTable(tableName);
- if (metrics != null) {
- metrics.incNumBatchGetRowKeys(rows.size());
- }
-
- // For now, don't support ragged gets, with different columns per row
- // Probably pretty sensible indefinitely anyways.
- for (byte[] row : rows) {
- Get get = new Get(row);
- if (columns != null) {
- for (byte[] column : columns) {
- byte[][] famAndQf = KeyValue.parseColumn(column);
- if (famAndQf.length == 1) {
- get.addFamily(famAndQf[0]);
- } else {
- get.addColumn(famAndQf[0], famAndQf[1]);
- }
- }
- get.setTimeRange(Long.MIN_VALUE, timestamp);
- }
- gets.add(get);
- }
- Result[] result = table.get(gets);
- return ThriftUtilities.rowResultFromHBase(result);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public void deleteAllTs(byte[] tableName, byte[] row, byte[] column,
- long timestamp) throws IOError {
- try {
- HTable table = getTable(tableName);
- Delete delete = new Delete(row);
- byte [][] famAndQf = KeyValue.parseColumn(column);
- if (famAndQf.length == 1) {
- delete.deleteFamily(famAndQf[0], timestamp);
- } else {
- delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
- }
- table.delete(delete);
-
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public void deleteAllRow(byte[] tableName, byte[] row) throws IOError {
- deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP);
- }
-
- public void deleteAllRowTs(byte[] tableName, byte[] row, long timestamp)
- throws IOError {
- try {
- HTable table = getTable(tableName);
- Delete delete = new Delete(row, timestamp, null);
- table.delete(delete);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public void createTable(byte[] tableName,
- List<ColumnDescriptor> columnFamilies) throws IOError,
- IllegalArgument, AlreadyExists {
- try {
- if (admin.tableExists(tableName)) {
- throw new AlreadyExists("table name already in use");
- }
- HTableDescriptor desc = new HTableDescriptor(tableName);
- for (ColumnDescriptor col : columnFamilies) {
- HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
- desc.addFamily(colDesc);
- }
- admin.createTable(desc);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- } catch (IllegalArgumentException e) {
- throw new IllegalArgument(e.getMessage());
- }
- }
+ public static final String CONF_PREFIX = HConstants.THRIFT_PROXY_PREFIX;
- public void deleteTable(byte[] tableName) throws IOError {
- if (LOG.isDebugEnabled()) {
- LOG.debug("deleteTable: table=" + new String(tableName));
- }
- try {
- if (!admin.tableExists(tableName)) {
- throw new IOError("table does not exist");
- }
- admin.deleteTable(tableName);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
+ private static final Log LOG = LogFactory.getLog(ThriftServer.class);
- public void mutateRow(byte[] tableName, byte[] row,
- List<Mutation> mutations) throws IOError, IllegalArgument {
- mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP);
- }
+ private static final String MIN_WORKERS_OPTION = "minWorkers";
+ private static final String MAX_WORKERS_OPTION = "workers";
+ private static final String MAX_QUEUE_SIZE_OPTION = "queue";
+ private static final String KEEP_ALIVE_SEC_OPTION = "keepAliveSec";
+ static final String BIND_OPTION = "bind";
+ static final String COMPACT_OPTION = "compact";
+ static final String FRAMED_OPTION = "framed";
+ static final String PORT_OPTION = "port";
- public void mutateRowTs(byte[] tableName, byte[] row,
- List<Mutation> mutations, long timestamp) throws IOError, IllegalArgument {
- HTable table = null;
- try {
- table = getTable(tableName);
- Put put = new Put(row, timestamp, null);
-
- Delete delete = new Delete(row);
-
- // I apologize for all this mess :)
- for (Mutation m : mutations) {
- byte[][] famAndQf = KeyValue.parseColumn(m.column);
- if (m.isDelete) {
- if (famAndQf.length == 1) {
- delete.deleteFamily(famAndQf[0], timestamp);
- } else {
- delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
- }
- } else {
- if(famAndQf.length == 1) {
- put.add(famAndQf[0], new byte[0], m.value);
- } else {
- put.add(famAndQf[0], famAndQf[1], m.value);
- }
- }
- }
- if (!delete.isEmpty())
- table.delete(delete);
- if (!put.isEmpty())
- table.put(put);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- } catch (IllegalArgumentException e) {
- throw new IllegalArgument(e.getMessage());
- }
- }
-
- public void mutateRows(byte[] tableName, List<BatchMutation> rowBatches)
- throws IOError, IllegalArgument, TException {
- mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP);
- }
-
- public void mutateRowsTs(byte[] tableName, List<BatchMutation> rowBatches, long timestamp)
- throws IOError, IllegalArgument, TException {
- List<Put> puts = new ArrayList<Put>();
- List<Delete> deletes = new ArrayList<Delete>();
- if (metrics != null) {
- metrics.incNumBatchMutateRowKeys(rowBatches.size());
- }
-
- for (BatchMutation batch : rowBatches) {
- byte[] row = batch.row;
- List<Mutation> mutations = batch.mutations;
- Delete delete = new Delete(row);
- Put put = new Put(row, timestamp, null);
- for (Mutation m : mutations) {
- byte[][] famAndQf = KeyValue.parseColumn(m.column);
- if (m.isDelete) {
- // no qualifier, family only.
- if (famAndQf.length == 1) {
- delete.deleteFamily(famAndQf[0], timestamp);
- } else {
- delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
- }
- } else {
- if(famAndQf.length == 1) {
- put.add(famAndQf[0], new byte[0], m.value);
- } else {
- put.add(famAndQf[0], famAndQf[1], m.value);
- }
- }
- }
- if (!delete.isEmpty())
- deletes.add(delete);
- if (!put.isEmpty())
- puts.add(put);
- }
-
- HTable table = null;
- try {
- table = getTable(tableName);
- if (!puts.isEmpty())
- table.put(puts);
- for (Delete del : deletes) {
- table.delete(del);
- }
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- } catch (IllegalArgumentException e) {
- throw new IllegalArgument(e.getMessage());
- }
- }
-
- /**
- * Warning; the puts and deletes are NOT atomic together and so a lot of
- * weird things can happen if you expect that to be the case!!
- *
- * A valueCheck of null means that the row can't exist before being put.
- * This is kind of a stupid thing to enforce when deleting, for obvious
- * reasons.
- */
- @Override
- public boolean checkAndMutateRow(byte[] tableName, byte[] row,
- byte[] columnCheck, byte[] valueCheck, List<Mutation> mutations)
- throws IOError, IllegalArgument {
- return checkAndMutateRowTs(tableName, row, columnCheck, valueCheck,
- mutations, HConstants.LATEST_TIMESTAMP);
- }
-
- @Override
- public boolean checkAndMutateRowTs(byte[] tableName, byte[] row,
- byte[] columnCheck, byte[] valueCheck,
- List<Mutation> mutations,
- long timestamp) throws IOError, IllegalArgument {
- HTable table;
- try {
- table = getTable(tableName);
- Put put = new Put(row, timestamp, null);
-
- Delete delete = new Delete(row);
-
- for (Mutation m : mutations) {
- byte[][] famAndQf = KeyValue.parseColumn(m.column);
- if (m.isDelete) {
- if (famAndQf.length == 1) {
- delete.deleteFamily(famAndQf[0], timestamp);
- } else {
- delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
- }
- } else {
- if (famAndQf.length == 1) {
- put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, m.value);
- } else {
- put.add(famAndQf[0], famAndQf[1], m.value);
- }
- }
- }
- byte[][] famAndQfCheck = KeyValue.parseColumn(columnCheck);
-
- if (!delete.isEmpty() && !put.isEmpty()) {
- // can't do both, not atomic, not good idea!
- throw new IllegalArgumentException(
- "Single Thrift CheckAndMutate call cannot do both puts and deletes.");
- }
- if (!delete.isEmpty()) {
- return table.checkAndDelete(row, famAndQfCheck[0],
- famAndQfCheck.length != 1 ? famAndQfCheck[1]
- : HConstants.EMPTY_BYTE_ARRAY, valueCheck, delete);
- }
- if (!put.isEmpty()) {
- return table.checkAndPut(row, famAndQfCheck[0],
- famAndQfCheck.length != 1 ? famAndQfCheck[1]
- : HConstants.EMPTY_BYTE_ARRAY, valueCheck, put);
- }
- throw new IllegalArgumentException(
- "Thrift CheckAndMutate call must do either put or delete.");
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- } catch (IllegalArgumentException e) {
- throw new IllegalArgument(e.getMessage());
- }
- }
-
- @Deprecated
- public long atomicIncrement(byte[] tableName, byte[] row, byte[] column,
- long amount) throws IOError, IllegalArgument, TException {
- byte [][] famAndQf = KeyValue.parseColumn(column);
- if(famAndQf.length == 1) {
- return atomicIncrement(tableName, row, famAndQf[0], new byte[0],
- amount);
- }
- return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
- }
-
- public long atomicIncrement(byte [] tableName, byte [] row, byte [] family,
- byte [] qualifier, long amount)
- throws IOError, IllegalArgument, TException {
- HTable table;
- try {
- table = getTable(tableName);
- return table.incrementColumnValue(row, family, qualifier, amount);
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public void scannerClose(int id) throws IOError, IllegalArgument {
- LOG.debug("scannerClose: id=" + id);
- ResultScanner scanner = getScanner(id);
- if (scanner == null) {
- throw new IllegalArgument("scanner ID is invalid");
- }
- scanner.close();
- removeScanner(id);
- }
-
- public List<TRowResult> scannerGetList(int id,int nbRows) throws IllegalArgument, IOError {
- LOG.debug("scannerGetList: id=" + id);
- ResultScanner scanner = getScanner(id);
- if (null == scanner) {
- throw new IllegalArgument("scanner ID is invalid");
- }
-
- Result [] results = null;
- try {
- results = scanner.next(nbRows);
- if (null == results) {
- return new ArrayList<TRowResult>();
- }
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- return ThriftUtilities.rowResultFromHBase(results);
- }
- public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
- return scannerGetList(id,1);
- }
- public int scannerOpen(byte[] tableName, byte[] startRow,
- List<byte[]> columns) throws IOError {
- try {
- HTable table = getTable(tableName);
- Scan scan = new Scan(startRow);
- if(columns != null && columns.size() != 0) {
- for(byte [] column : columns) {
- byte [][] famQf = KeyValue.parseColumn(column);
- if(famQf.length == 1) {
- scan.addFamily(famQf[0]);
- } else {
- scan.addColumn(famQf[0], famQf[1]);
- }
- }
- }
- return addScanner(table.getScanner(scan));
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public int scannerOpenWithStop(byte[] tableName, byte[] startRow,
- byte[] stopRow, List<byte[]> columns) throws IOError, TException {
- try {
- HTable table = getTable(tableName);
- Scan scan = new Scan(startRow, stopRow);
- if(columns != null && columns.size() != 0) {
- for(byte [] column : columns) {
- byte [][] famQf = KeyValue.parseColumn(column);
- if(famQf.length == 1) {
- scan.addFamily(famQf[0]);
- } else {
- scan.addColumn(famQf[0], famQf[1]);
- }
- }
- }
- return addScanner(table.getScanner(scan));
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public int scannerOpenWithPrefix(byte[] tableName, byte[] startAndPrefix, List<byte[]> columns) throws IOError, TException {
- try {
- HTable table = getTable(tableName);
- Scan scan = new Scan(startAndPrefix);
- Filter f = new WhileMatchFilter(
- new PrefixFilter(startAndPrefix));
- scan.setFilter(f);
- if(columns != null && columns.size() != 0) {
- for(byte [] column : columns) {
- byte [][] famQf = KeyValue.parseColumn(column);
- if(famQf.length == 1) {
- scan.addFamily(famQf[0]);
- } else {
- scan.addColumn(famQf[0], famQf[1]);
- }
- }
- }
- return addScanner(table.getScanner(scan));
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public int scannerOpenWithScan(byte [] tableName, TScan tScan) throws IOError {
- try {
- HTable table = getTable(tableName);
- Scan scan = new Scan();
- if (tScan.isSetStartRow()) {
- scan.setStartRow(tScan.getStartRow());
- }
- if (tScan.isSetStopRow()) {
- scan.setStopRow(tScan.getStopRow());
- }
- if (tScan.isSetTimestamp()) {
- scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp());
- }
- if (tScan.isSetCaching()) {
- scan.setCaching(tScan.getCaching());
- }
- if(tScan.isSetColumns() && tScan.getColumns().size() != 0) {
- for(byte [] column : tScan.getColumns()) {
- byte [][] famQf = KeyValue.parseColumn(column);
- if(famQf.length == 1) {
- scan.addFamily(famQf[0]);
- } else {
- scan.addColumn(famQf[0], famQf[1]);
- }
- }
- }
- if (tScan.isSetFilterString()) {
- ParseFilter parseFilter = new ParseFilter();
- scan.setFilter(parseFilter.parseFilterString(tScan.getFilterString()));
- }
- return addScanner(table.getScanner(scan));
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public int scannerOpenTs(byte[] tableName, byte[] startRow,
- List<byte[]> columns, long timestamp) throws IOError, TException {
- try {
- HTable table = getTable(tableName);
- Scan scan = new Scan(startRow);
- scan.setTimeRange(Long.MIN_VALUE, timestamp);
- if(columns != null && columns.size() != 0) {
- for(byte [] column : columns) {
- byte [][] famQf = KeyValue.parseColumn(column);
- if(famQf.length == 1) {
- scan.addFamily(famQf[0]);
- } else {
- scan.addColumn(famQf[0], famQf[1]);
- }
- }
- }
- return addScanner(table.getScanner(scan));
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public int scannerOpenWithStopTs(byte[] tableName, byte[] startRow,
- byte[] stopRow, List<byte[]> columns, long timestamp)
- throws IOError, TException {
- try {
- HTable table = getTable(tableName);
- Scan scan = new Scan(startRow, stopRow);
- scan.setTimeRange(Long.MIN_VALUE, timestamp);
- if(columns != null && columns.size() != 0) {
- for(byte [] column : columns) {
- byte [][] famQf = KeyValue.parseColumn(column);
- if(famQf.length == 1) {
- scan.addFamily(famQf[0]);
- } else {
- scan.addColumn(famQf[0], famQf[1]);
- }
- }
- }
- scan.setTimeRange(Long.MIN_VALUE, timestamp);
- return addScanner(table.getScanner(scan));
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public int scannerOpenWithFilterString(byte [] tableName,
- byte [] filterString) throws IOError, TException {
- return scannerOpenWithFilterStringTs(tableName, filterString, Long.MAX_VALUE);
- }
-
- @Override
- public int scannerOpenWithFilterStringTs(byte [] tableName, byte [] filterString,
- long timestamp) throws IOError, TException {
- return scannerOpenWithStopAndFilterStringTs(tableName,
- HConstants.EMPTY_START_ROW,
- HConstants.EMPTY_END_ROW,
- filterString, timestamp);
- }
-
- @Override
- public int scannerOpenWithStopAndFilterString(byte [] tableName,
- byte [] startRow, byte [] stopRow,
- byte [] filterString)
- throws IOError, TException {
- return scannerOpenWithStopAndFilterStringTs(tableName, startRow, stopRow,
- filterString, Long.MAX_VALUE);
- }
-
- @Override
- public int scannerOpenWithStopAndFilterStringTs(byte [] tableName, byte [] startRow,
- byte [] stopRow, byte [] filterString,
- long timestamp) throws IOError, TException {
- try {
- HTable table = getTable(tableName);
- Scan scan = new Scan(startRow, stopRow);
- scan.setTimeRange(Long.MIN_VALUE, timestamp);
-
- if (filterString != null && filterString.length != 0) {
- ParseFilter parseFilter = new ParseFilter();
- scan.setFilter(parseFilter.parseFilterString(filterString));
- }
- return addScanner(table.getScanner(scan));
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- public Map<byte[], ColumnDescriptor> getColumnDescriptors(
- byte[] tableName) throws IOError, TException {
- try {
- TreeMap<byte[], ColumnDescriptor> columns =
- new TreeMap<byte[], ColumnDescriptor>(Bytes.BYTES_COMPARATOR);
-
- HTable table = getTable(tableName);
- HTableDescriptor desc = table.getTableDescriptor();
-
- for (HColumnDescriptor e : desc.getFamilies()) {
- ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
- columns.put(col.name, col);
- }
- return columns;
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- @Override
- public TRegionInfo getRegionInfo(byte[] searchRow) throws IOError,
- TException {
- try {
- HTable table = getTable(HConstants.META_TABLE_NAME);
- Result startRowResult = table.getRowOrBefore(searchRow,
- HConstants.CATALOG_FAMILY);
-
- if (startRowResult == null) {
- throw new IOException("Cannot find row in .META., row="
- + Bytes.toString(searchRow));
- }
-
- // find region start and end keys
- byte[] value = startRowResult.getValue(HConstants.CATALOG_FAMILY,
- HConstants.REGIONINFO_QUALIFIER);
- if (value == null || value.length == 0) {
- throw new IOException("HRegionInfo REGIONINFO was null or "
- + " empty in Meta for row=" + Bytes.toString(searchRow));
- }
- HRegionInfo regionInfo = Writables.getHRegionInfo(value);
- TRegionInfo region = new TRegionInfo();
- region.setStartKey(regionInfo.getStartKey());
- region.setEndKey(regionInfo.getEndKey());
- region.id = regionInfo.getRegionId();
- region.setName(regionInfo.getRegionName());
- region.version = regionInfo.getVersion();
-
- // find region assignment to server
- value = startRowResult.getValue(HConstants.CATALOG_FAMILY,
- HConstants.SERVER_QUALIFIER);
- if (value != null && value.length > 0) {
- String address = Bytes.toString(value);
- HServerAddress server = new HServerAddress(address);
- byte[] hostname = Bytes.toBytes(server.getHostname());
- region.serverName = hostname;
- region.port = server.getPort();
- }
- return region;
- } catch (IOException e) {
- throw new IOError(e.getMessage());
- }
- }
-
- }
+ private Configuration conf;
+ ThriftServerRunner serverRunner;
//
// Main program and support routines
//
- private static void printUsageAndExit(Options options, int exitCode) {
+ public ThriftServer(Configuration conf) {
+ this.conf = HBaseConfiguration.create(conf);
+ }
+
+ private static void printUsageAndExit(Options options, int exitCode)
+ throws ExitCodeException {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("Thrift", null, options,
- "To start the Thrift server run 'bin/hbase-daemon.sh start thrift'\n" +
- "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift' or" +
- " send a kill signal to the thrift server pid",
- true);
- System.exit(exitCode);
+ "To start the Thrift server run 'bin/hbase-daemon.sh start thrift'\n" +
+ "To shutdown the thrift server run 'bin/hbase-daemon.sh stop " +
+ "thrift' or send a kill signal to the thrift server pid",
+ true);
+ throw new ExitCodeException(exitCode, "");
}
- static final String DEFAULT_LISTEN_PORT = "9090";
-
- /*
- * Start up the Thrift server.
+ /**
+ * Start up or shuts down the Thrift server, depending on the arguments.
* @param args
*/
- static private void doMain(final String[] args) throws Exception {
- Log LOG = LogFactory.getLog("ThriftServer");
+ void doMain(final String[] args) throws Exception {
+ processOptions(args);
+ serverRunner = new ThriftServerRunner(conf, HConstants.THRIFT_PROXY_PREFIX);
+ serverRunner.run();
+ }
+ /**
+ * Parse the command line options to set parameters the conf.
+ */
+ private void processOptions(final String[] args) throws Exception {
Options options = new Options();
- options.addOption("b", "bind", true, "Address to bind the Thrift server to. Not supported by the Nonblocking and HsHa server [default: 0.0.0.0]");
- options.addOption("p", "port", true, "Port to bind to [default: 9090]");
- options.addOption("f", "framed", false, "Use framed transport");
- options.addOption("c", "compact", false, "Use the compact protocol");
+ options.addOption("b", BIND_OPTION, true, "Address to bind " +
+ "the Thrift server to. Not supported by the following server types: " +
+ Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP()) +
+ ". Default value: " + HConstants.DEFAULT_HOST + ".");
+ options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " +
+ HConstants.DEFAULT_THRIFT_PROXY_PORT + "]");
+ options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
+ options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
options.addOption("h", "help", false, "Print help information");
- options.addOption("m", "minWorkers", true, "The minimum number of worker " +
- "threads for " + THREAD_POOL_SERVER_CLASS.getSimpleName());
- options.addOption("w", "workers", true, "The maximum number of worker " +
- "threads for " + THREAD_POOL_SERVER_CLASS.getSimpleName());
- options.addOption("q", "queue", true, "The maximum number of queued " +
- "requests in " + THREAD_POOL_SERVER_CLASS.getSimpleName());
-
- OptionGroup servers = new OptionGroup();
- servers.addOption(new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
- servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
- servers.addOption(new Option("threadpool", false, "Use "
- + THREAD_POOL_SERVER_CLASS.getSimpleName() + ". This is the default."));
- options.addOptionGroup(servers);
+
+ options.addOption("m", MIN_WORKERS_OPTION, true,
+ "The minimum number of worker threads for " +
+ ImplType.THREAD_POOL.simpleClassName());
+
+ options.addOption("w", MAX_WORKERS_OPTION, true,
+ "The maximum number of worker threads for " +
+ ImplType.THREAD_POOL.simpleClassName());
+
+ options.addOption("q", MAX_QUEUE_SIZE_OPTION, true,
+ "The maximum number of queued requests in " +
+ ImplType.THREAD_POOL.simpleClassName());
+
+ options.addOption("k", KEEP_ALIVE_SEC_OPTION, true,
+ "The amount of time in secods to keep a thread alive when idle in " +
+ ImplType.THREAD_POOL.simpleClassName());
+
+ options.addOptionGroup(ImplType.createOptionGroup());
CommandLineParser parser = new PosixParser();
CommandLine cmd = parser.parse(options, args);
- Configuration conf = HBaseConfiguration.create();
-
- /**
- * This is so complicated to please both bin/hbase and bin/hbase-daemon.
- * hbase-daemon provides "start" and "stop" arguments
- * hbase should print the help if no argument is provided
- */
+ // This is so complicated to please both bin/hbase and bin/hbase-daemon.
+ // hbase-daemon provides "start" and "stop" arguments
+ // hbase should print the help if no argument is provided
List<String> commandLine = Arrays.asList(args);
boolean stop = commandLine.contains("stop");
boolean start = commandLine.contains("start");
- if (cmd.hasOption("help") || !start || stop) {
+ boolean invalidStartStop = (start && stop) || (!start && !stop);
+ if (cmd.hasOption("help") || invalidStartStop) {
+ if (invalidStartStop) {
+ LOG.error("Exactly one of 'start' and 'stop' has to be specified");
+ }
printUsageAndExit(options, 1);
}
// Get port to bind to
- int listenPort = 0;
try {
- listenPort = Integer.parseInt(cmd.getOptionValue("port", DEFAULT_LISTEN_PORT));
+ int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION,
+ String.valueOf(HConstants.DEFAULT_THRIFT_PROXY_PORT)));
+ conf.setInt(CONF_PREFIX + HConstants.THRIFT_PORT_SUFFIX, listenPort);
} catch (NumberFormatException e) {
LOG.error("Could not parse the value provided for the port option", e);
printUsageAndExit(options, -1);
}
// Make optional changes to the configuration based on command-line options
- if (cmd.hasOption("minWorkers")) {
- conf.set(TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY,
- cmd.getOptionValue("minWorkers"));
- }
-
- if (cmd.hasOption("workers")) {
- conf.set(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
- cmd.getOptionValue("workers"));
- }
-
- if (cmd.hasOption("queue")) {
- conf.set(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
- cmd.getOptionValue("queue"));
- }
-
- // Only instantiate this when finished modifying the configuration
- TBoundedThreadPoolServer.Options serverOptions =
- new TBoundedThreadPoolServer.Options(conf);
-
- // Construct correct ProtocolFactory
- TProtocolFactory protocolFactory;
- if (cmd.hasOption("compact")) {
- LOG.debug("Using compact protocol");
- protocolFactory = new TCompactProtocol.Factory();
- } else {
- LOG.debug("Using binary protocol");
- protocolFactory = new TBinaryProtocol.Factory();
+ optionToConf(cmd, MIN_WORKERS_OPTION,
+ conf, CONF_PREFIX + TBoundedThreadPoolServer.MIN_WORKER_THREADS_SUFFIX);
+ optionToConf(cmd, MAX_WORKERS_OPTION,
+ conf, CONF_PREFIX + TBoundedThreadPoolServer.MAX_WORKER_THREADS_SUFFIX);
+ optionToConf(cmd, MAX_QUEUE_SIZE_OPTION,
+ conf, CONF_PREFIX + TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_SUFFIX);
+ optionToConf(cmd, KEEP_ALIVE_SEC_OPTION,
+ conf, CONF_PREFIX + TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_SUFFIX);
+
+ // Set general thrift server options
+ conf.setBoolean(CONF_PREFIX + HConstants.THRIFT_COMPACT_SUFFIX, cmd.hasOption(COMPACT_OPTION));
+ conf.setBoolean(CONF_PREFIX + HConstants.THRIFT_FRAMED_SUFFIX, cmd.hasOption(FRAMED_OPTION));
+ if (cmd.hasOption(BIND_OPTION)) {
+ conf.set(CONF_PREFIX + HConstants.THRIFT_BIND_SUFFIX, cmd.getOptionValue(BIND_OPTION));
}
- ThriftMetrics metrics = new ThriftMetrics(listenPort, conf);
- Hbase.Iface handler = new HBaseHandler(conf, metrics);
- handler = HbaseHandlerMetricsProxy.newInstance(handler, metrics, conf);
- Hbase.Processor processor = new Hbase.Processor(handler);
-
- TServer server;
- if (cmd.hasOption("nonblocking") || cmd.hasOption("hsha")) {
- if (cmd.hasOption("bind")) {
- LOG.error("The Nonblocking and HsHa servers don't support IP address binding at the moment." +
- " See https://issues.apache.org/jira/browse/HBASE-2155 for details.");
- printUsageAndExit(options, -1);
- }
-
- TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(listenPort);
- TFramedTransport.Factory transportFactory = new TFramedTransport.Factory();
-
- if (cmd.hasOption("nonblocking")) {
- LOG.info("starting HBase Nonblocking Thrift server on " + Integer.toString(listenPort));
- server = new TNonblockingServer(processor, serverTransport, transportFactory, protocolFactory);
- } else {
- LOG.info("starting HBase HsHA Thrift server on " + Integer.toString(listenPort));
- server = new THsHaServer(processor, serverTransport, transportFactory, protocolFactory);
- }
- } else {
- // Get IP address to bind to
- InetAddress listenAddress = null;
- if (cmd.hasOption("bind")) {
- try {
- listenAddress = InetAddress.getByName(cmd.getOptionValue("bind"));
- } catch (UnknownHostException e) {
- LOG.error("Could not bind to provided ip address", e);
- printUsageAndExit(options, -1);
- }
- } else {
- listenAddress = InetAddress.getLocalHost();
- }
- TServerTransport serverTransport =
- new TServerSocket(new InetSocketAddress(listenAddress, listenPort));
+ ImplType.setServerImpl(cmd, conf, CONF_PREFIX);
+ }
- // Construct correct TransportFactory
- TTransportFactory transportFactory;
- if (cmd.hasOption("framed")) {
- transportFactory = new TFramedTransport.Factory();
- LOG.debug("Using framed transport");
- } else {
- transportFactory = new TTransportFactory();
- }
+ public void stop() {
+ serverRunner.shutdown();
+ }
- LOG.info("starting " + THREAD_POOL_SERVER_CLASS.getSimpleName() + " on "
- + listenAddress + ":" + Integer.toString(listenPort)
- + "; minimum number of worker threads="
- + serverOptions.minWorkerThreads
- + ", maximum number of worker threads="
- + serverOptions.maxWorkerThreads + ", queued requests="
- + serverOptions.maxQueuedRequests);
-
- server = new TBoundedThreadPoolServer(processor, serverTransport,
- transportFactory, protocolFactory, serverOptions, metrics);
-
- if (server.getClass() != THREAD_POOL_SERVER_CLASS) {
- // A sanity check that we instantiated the right thing.
- throw new RuntimeException("Expected thread pool server class " +
- THREAD_POOL_SERVER_CLASS.getName() + " but got " +
- server.getClass().getName());
- }
+ private static void optionToConf(CommandLine cmd, String option,
+ Configuration conf, String destConfKey) {
+ if (cmd.hasOption(option)) {
+ String value = cmd.getOptionValue(option);
+ LOG.info("Set configuration key:" + destConfKey + " value:" + value);
+ conf.set(destConfKey, value);
}
-
- server.serve();
}
/**
@@ -1226,6 +186,10 @@ public class ThriftServer {
* @throws Exception
*/
public static void main(String [] args) throws Exception {
- doMain(args);
+ try {
+ new ThriftServer(HBaseConfiguration.create()).doMain(args);
+ } catch (ExitCodeException ex) {
+ System.exit(ex.getExitCode());
+ }
}
}