You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/01/17 18:49:34 UTC
svn commit: r1232506 - in
/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift:
HThreadedSelectorServerArgs.java ThriftServerRunner.java
Author: tedyu
Date: Tue Jan 17 17:49:34 2012
New Revision: 1232506
URL: http://svn.apache.org/viewvc?rev=1232506&view=rev
Log:
HBASE-5201 Add new files
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java?rev=1232506&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java Tue Jan 17 17:49:34 2012
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A TThreadedSelectorServer.Args that reads hadoop configuration
+ */
+public class HThreadedSelectorServerArgs extends TThreadedSelectorServer.Args {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TThreadedSelectorServer.class);
+
+ /**
+ * Number of selector threads for reading and writing socket
+ */
+ public static final String SELECTOR_THREADS_CONF_KEY =
+ "hbase.thrift.selector.threads";
+
+ /**
+ * Number fo threads for processing the thrift calls
+ */
+ public static final String WORKER_THREADS_CONF_KEY =
+ "hbase.thrift.worker.threads";
+
+ /**
+ * Time to wait for server to stop gracefully
+ */
+ public static final String STOP_TIMEOUT_CONF_KEY =
+ "hbase.thrift.stop.timeout.seconds";
+
+ /**
+ * Maximum number of accepted elements per selector
+ */
+ public static final String ACCEPT_QUEUE_SIZE_PER_THREAD_CONF_KEY =
+ "hbase.thrift.accept.queue.size.per.selector";
+
+ /**
+ * The strategy for handling new accepted connections.
+ */
+ public static final String ACCEPT_POLICY_CONF_KEY =
+ "hbase.thrift.accept.policy";
+
+ public HThreadedSelectorServerArgs(
+ TNonblockingServerTransport transport, Configuration conf) {
+ super(transport);
+ readConf(conf);
+ }
+
+ private void readConf(Configuration conf) {
+ int selectorThreads = conf.getInt(
+ SELECTOR_THREADS_CONF_KEY, getSelectorThreads());
+ int workerThreads = conf.getInt(
+ WORKER_THREADS_CONF_KEY, getWorkerThreads());
+ int stopTimeoutVal = conf.getInt(
+ STOP_TIMEOUT_CONF_KEY, getStopTimeoutVal());
+ int acceptQueueSizePerThread = conf.getInt(
+ ACCEPT_QUEUE_SIZE_PER_THREAD_CONF_KEY, getAcceptQueueSizePerThread());
+ AcceptPolicy acceptPolicy = AcceptPolicy.valueOf(conf.get(
+ ACCEPT_POLICY_CONF_KEY, getAcceptPolicy().toString()).toUpperCase());
+
+ super.selectorThreads(selectorThreads)
+ .workerThreads(workerThreads)
+ .stopTimeoutVal(stopTimeoutVal)
+ .acceptQueueSizePerThread(acceptQueueSizePerThread)
+ .acceptPolicy(acceptPolicy);
+
+ LOG.info("Read configuration selectorThreads:" + selectorThreads +
+ " workerThreads:" + workerThreads +
+ " stopTimeoutVal:" + stopTimeoutVal + "sec" +
+ " acceptQueueSizePerThread:" + acceptQueueSizePerThread +
+ " acceptPolicy:" + acceptPolicy);
+ }
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java?rev=1232506&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java Tue Jan 17 17:49:34 2012
@@ -0,0 +1,1237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import static org.apache.hadoop.hbase.util.Bytes.getBytes;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.ParseFilter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
+import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
+import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.hbase.thrift.generated.IOError;
+import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
+import org.apache.hadoop.hbase.thrift.generated.Mutation;
+import org.apache.hadoop.hbase.thrift.generated.TCell;
+import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
+import org.apache.hadoop.hbase.thrift.generated.TRowResult;
+import org.apache.hadoop.hbase.thrift.generated.TScan;
+import org.apache.hadoop.hbase.util.Addressing;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.net.DNS;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportFactory;
+
+import com.google.common.base.Joiner;
+
+/**
+ * ThriftServerRunner - this class starts up a Thrift server which implements
+ * the Hbase API specified in the Hbase.thrift IDL file.
+ */
+public class ThriftServerRunner implements Runnable {
+
+ private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
+
+ static final String SERVER_TYPE_CONF_KEY =
+ "hbase.regionserver.thrift.server.type";
+
+ static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
+ static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
+ static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
+ static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
+
+ private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
+ private static final int DEFAULT_LISTEN_PORT = 9090;
+
+ private Configuration conf;
+ volatile TServer tserver;
+ private final HBaseHandler handler;
+
+ /** An enum of server implementation selections */
+ enum ImplType {
+ HS_HA("hsha", true, THsHaServer.class, false),
+ NONBLOCKING("nonblocking", true, TNonblockingServer.class, false),
+ THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
+ THREADED_SELECTOR(
+ "threadedselector", true, TThreadedSelectorServer.class, false);
+
+ public static final ImplType DEFAULT = THREAD_POOL;
+
+ final String option;
+ final boolean isAlwaysFramed;
+ final Class<? extends TServer> serverClass;
+ final boolean canSpecifyBindIP;
+
+ ImplType(String option, boolean isAlwaysFramed,
+ Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
+ this.option = option;
+ this.isAlwaysFramed = isAlwaysFramed;
+ this.serverClass = serverClass;
+ this.canSpecifyBindIP = canSpecifyBindIP;
+ }
+
+ /**
+ * @return <code>-option</code> so we can get the list of options from
+ * {@link #values()}
+ */
+ @Override
+ public String toString() {
+ return "-" + option;
+ }
+
+ String getDescription() {
+ StringBuilder sb = new StringBuilder("Use the " +
+ serverClass.getSimpleName());
+ if (isAlwaysFramed) {
+ sb.append(" This implies the framed transport.");
+ }
+ if (this == DEFAULT) {
+ sb.append("This is the default.");
+ }
+ return sb.toString();
+ }
+
+ static OptionGroup createOptionGroup() {
+ OptionGroup group = new OptionGroup();
+ for (ImplType t : values()) {
+ group.addOption(new Option(t.option, t.getDescription()));
+ }
+ return group;
+ }
+
+ static ImplType getServerImpl(Configuration conf) {
+ String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
+ for (ImplType t : values()) {
+ if (confType.equals(t.option)) {
+ return t;
+ }
+ }
+ throw new AssertionError("Unknown server ImplType.option:" + confType);
+ }
+
+ static void setServerImpl(CommandLine cmd, Configuration conf) {
+ ImplType chosenType = null;
+ int numChosen = 0;
+ for (ImplType t : values()) {
+ if (cmd.hasOption(t.option)) {
+ chosenType = t;
+ ++numChosen;
+ }
+ }
+ if (numChosen != 1) {
+ throw new AssertionError("Exactly one option out of " +
+ Arrays.toString(values()) + " has to be specified");
+ }
+ LOG.info("Setting thrift server to " + chosenType.option);
+ conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
+ }
+
+ public String simpleClassName() {
+ return serverClass.getSimpleName();
+ }
+
+ public static List<String> serversThatCannotSpecifyBindIP() {
+ List<String> l = new ArrayList<String>();
+ for (ImplType t : values()) {
+ if (!t.canSpecifyBindIP) {
+ l.add(t.simpleClassName());
+ }
+ }
+ return l;
+ }
+
+ }
+
+ public ThriftServerRunner(Configuration conf) throws IOException {
+ this(conf, new ThriftServerRunner.HBaseHandler(conf));
+ }
+
+ public ThriftServerRunner(Configuration conf, HBaseHandler handler) {
+ this.conf = HBaseConfiguration.create(conf);
+ this.handler = handler;
+ }
+
+ /*
+ * Runs the Thrift server
+ */
+ @Override
+ public void run() {
+ try {
+ setupServer();
+ tserver.serve();
+ } catch (Exception e) {
+ LOG.fatal("Cannot run ThriftServer");
+ // Crash the process if the ThriftServer is not running
+ System.exit(-1);
+ }
+ }
+
+ public void shutdown() {
+ if (tserver != null) {
+ tserver.stop();
+ tserver = null;
+ }
+ }
+
+ /**
+ * Setting up the thrift TServer
+ */
+ private void setupServer() throws Exception {
+ // Get port to bind to
+ int listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
+
+ // Construct correct ProtocolFactory
+ TProtocolFactory protocolFactory;
+ if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
+ LOG.debug("Using compact protocol");
+ protocolFactory = new TCompactProtocol.Factory();
+ } else {
+ LOG.debug("Using binary protocol");
+ protocolFactory = new TBinaryProtocol.Factory();
+ }
+
+ Hbase.Processor<Hbase.Iface> processor =
+ new Hbase.Processor<Hbase.Iface>(handler);
+ ImplType implType = ImplType.getServerImpl(conf);
+
+ // Construct correct TransportFactory
+ TTransportFactory transportFactory;
+ if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
+ transportFactory = new TFramedTransport.Factory();
+ LOG.debug("Using framed transport");
+ } else {
+ transportFactory = new TTransportFactory();
+ }
+
+ if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
+ LOG.error("Server types " + Joiner.on(", ").join(
+ ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
+ "address binding at the moment. See " +
+ "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
+ throw new RuntimeException(
+ "-" + BIND_CONF_KEY + " not supported with " + implType);
+ }
+
+ if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
+ implType == ImplType.THREADED_SELECTOR) {
+
+ TNonblockingServerTransport serverTransport =
+ new TNonblockingServerSocket(listenPort);
+
+ if (implType == ImplType.NONBLOCKING) {
+ TNonblockingServer.Args serverArgs =
+ new TNonblockingServer.Args(serverTransport);
+ serverArgs.processor(processor)
+ .transportFactory(transportFactory)
+ .protocolFactory(protocolFactory);
+ tserver = new TNonblockingServer(serverArgs);
+ } else if (implType == ImplType.HS_HA) {
+ THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
+ serverArgs.processor(processor)
+ .transportFactory(transportFactory)
+ .protocolFactory(protocolFactory);
+ tserver = new THsHaServer(serverArgs);
+ } else { // THREADED_SELECTOR
+ TThreadedSelectorServer.Args serverArgs =
+ new HThreadedSelectorServerArgs(serverTransport, conf);
+ serverArgs.processor(processor)
+ .transportFactory(transportFactory)
+ .protocolFactory(protocolFactory);
+ tserver = new TThreadedSelectorServer(serverArgs);
+ }
+ LOG.info("starting HBase " + implType.simpleClassName() +
+ " server on " + Integer.toString(listenPort));
+ } else if (implType == ImplType.THREAD_POOL) {
+ // Thread pool server. Get the IP address to bind to.
+ InetAddress listenAddress = getBindAddress(conf);
+
+ TServerTransport serverTransport = new TServerSocket(
+ new InetSocketAddress(listenAddress, listenPort));
+
+ TBoundedThreadPoolServer.Args serverArgs =
+ new TBoundedThreadPoolServer.Args(serverTransport, conf);
+ serverArgs.processor(processor)
+ .transportFactory(transportFactory)
+ .protocolFactory(protocolFactory);
+ LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
+ + listenAddress + ":" + Integer.toString(listenPort)
+ + "; " + serverArgs);
+ tserver = new TBoundedThreadPoolServer(serverArgs);
+ } else {
+ throw new AssertionError("Unsupported Thrift server implementation: " +
+ implType.simpleClassName());
+ }
+
+ // A sanity check that we instantiated the right type of server.
+ if (tserver.getClass() != implType.serverClass) {
+ throw new AssertionError("Expected to create Thrift server class " +
+ implType.serverClass.getName() + " but got " +
+ tserver.getClass().getName());
+ }
+
+ // login the server principal (if using secure Hadoop)
+ Configuration conf = handler.conf;
+ if (User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf)) {
+ String machineName = Strings.domainNamePointerToHostName(
+ DNS.getDefaultHost(conf.get("hbase.thrift.dns.interface", "default"),
+ conf.get("hbase.thrift.dns.nameserver", "default")));
+ User.login(conf, "hbase.thrift.keytab.file",
+ "hbase.thrift.kerberos.principal", machineName);
+ }
+ }
+
+ private InetAddress getBindAddress(Configuration conf)
+ throws UnknownHostException {
+ String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
+ return InetAddress.getByName(bindAddressStr);
+ }
+
+ /**
+ * The HBaseHandler is a glue object that connects Thrift RPC calls to the
+ * HBase client API primarily defined in the HBaseAdmin and HTable objects.
+ */
+ public static class HBaseHandler implements Hbase.Iface {
+ protected Configuration conf;
+ protected HBaseAdmin admin = null;
+ protected final Log LOG = LogFactory.getLog(this.getClass().getName());
+
+ // nextScannerId and scannerMap are used to manage scanner state
+ protected int nextScannerId = 0;
+ protected HashMap<Integer, ResultScanner> scannerMap = null;
+
+ private static ThreadLocal<Map<String, HTable>> threadLocalTables =
+ new ThreadLocal<Map<String, HTable>>() {
+ @Override
+ protected Map<String, HTable> initialValue() {
+ return new TreeMap<String, HTable>();
+ }
+ };
+
+ /**
+ * Returns a list of all the column families for a given htable.
+ *
+ * @param table
+ * @return
+ * @throws IOException
+ */
+ byte[][] getAllColumns(HTable table) throws IOException {
+ HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
+ byte[][] columns = new byte[cds.length][];
+ for (int i = 0; i < cds.length; i++) {
+ columns[i] = Bytes.add(cds[i].getName(),
+ KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
+ }
+ return columns;
+ }
+
+ /**
+ * Creates and returns an HTable instance from a given table name.
+ *
+ * @param tableName
+ * name of table
+ * @return HTable object
+ * @throws IOException
+ * @throws IOError
+ */
+ protected HTable getTable(final byte[] tableName) throws
+ IOException {
+ String table = new String(tableName);
+ Map<String, HTable> tables = threadLocalTables.get();
+ if (!tables.containsKey(table)) {
+ tables.put(table, new HTable(conf, tableName));
+ }
+ return tables.get(table);
+ }
+
+ protected HTable getTable(final ByteBuffer tableName) throws IOException {
+ return getTable(getBytes(tableName));
+ }
+
+ /**
+ * Assigns a unique ID to the scanner and adds the mapping to an internal
+ * hash-map.
+ *
+ * @param scanner
+ * @return integer scanner id
+ */
+ protected synchronized int addScanner(ResultScanner scanner) {
+ int id = nextScannerId++;
+ scannerMap.put(id, scanner);
+ return id;
+ }
+
+ /**
+ * Returns the scanner associated with the specified ID.
+ *
+ * @param id
+ * @return a Scanner, or null if ID was invalid.
+ */
+ protected synchronized ResultScanner getScanner(int id) {
+ return scannerMap.get(id);
+ }
+
+ /**
+ * Removes the scanner associated with the specified ID from the internal
+ * id->scanner hash-map.
+ *
+ * @param id
+ * @return a Scanner, or null if ID was invalid.
+ */
+ protected synchronized ResultScanner removeScanner(int id) {
+ return scannerMap.remove(id);
+ }
+
+ /**
+ * Constructs an HBaseHandler object.
+ * @throws IOException
+ */
+ protected HBaseHandler()
+ throws IOException {
+ this(HBaseConfiguration.create());
+ }
+
+ protected HBaseHandler(final Configuration c)
+ throws IOException {
+ this.conf = c;
+ admin = new HBaseAdmin(conf);
+ scannerMap = new HashMap<Integer, ResultScanner>();
+ }
+
+ @Override
+ public void enableTable(ByteBuffer tableName) throws IOError {
+ try{
+ admin.enableTable(getBytes(tableName));
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public void disableTable(ByteBuffer tableName) throws IOError{
+ try{
+ admin.disableTable(getBytes(tableName));
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
+ try {
+ return HTable.isTableEnabled(this.conf, getBytes(tableName));
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
+ try{
+ admin.compact(getBytes(tableNameOrRegionName));
+ } catch (InterruptedException e) {
+ throw new IOError(e.getMessage());
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
+ try{
+ admin.majorCompact(getBytes(tableNameOrRegionName));
+ } catch (InterruptedException e) {
+ throw new IOError(e.getMessage());
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public List<ByteBuffer> getTableNames() throws IOError {
+ try {
+ HTableDescriptor[] tables = this.admin.listTables();
+ ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tables.length);
+ for (int i = 0; i < tables.length; i++) {
+ list.add(ByteBuffer.wrap(tables[i].getName()));
+ }
+ return list;
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
+ throws IOError {
+ try{
+ List<HRegionInfo> hris = this.admin.getTableRegions(tableName.array());
+ List<TRegionInfo> regions = new ArrayList<TRegionInfo>();
+
+ if (hris != null) {
+ for (HRegionInfo regionInfo : hris){
+ TRegionInfo region = new TRegionInfo();
+ region.startKey = ByteBuffer.wrap(regionInfo.getStartKey());
+ region.endKey = ByteBuffer.wrap(regionInfo.getEndKey());
+ region.id = regionInfo.getRegionId();
+ region.name = ByteBuffer.wrap(regionInfo.getRegionName());
+ region.version = regionInfo.getVersion();
+ regions.add(region);
+ }
+ }
+ return regions;
+ } catch (IOException e){
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Deprecated
+ @Override
+ public List<TCell> get(
+ ByteBuffer tableName, ByteBuffer row, ByteBuffer column)
+ throws IOError {
+ byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
+ if(famAndQf.length == 1) {
+ return get(tableName, row, famAndQf[0], new byte[0]);
+ }
+ return get(tableName, row, famAndQf[0], famAndQf[1]);
+ }
+
+ protected List<TCell> get(ByteBuffer tableName,
+ ByteBuffer row,
+ byte[] family,
+ byte[] qualifier) throws IOError {
+ try {
+ HTable table = getTable(tableName);
+ Get get = new Get(getBytes(row));
+ if (qualifier == null || qualifier.length == 0) {
+ get.addFamily(family);
+ } else {
+ get.addColumn(family, qualifier);
+ }
+ Result result = table.get(get);
+ return ThriftUtilities.cellFromHBase(result.raw());
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Deprecated
+ @Override
+ public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row,
+ ByteBuffer column, int numVersions) throws IOError {
+ byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
+ if(famAndQf.length == 1) {
+ return getVer(tableName, row, famAndQf[0],
+ new byte[0], numVersions);
+ }
+ return getVer(tableName, row,
+ famAndQf[0], famAndQf[1], numVersions);
+ }
+
+ public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row,
+ byte[] family,
+ byte[] qualifier, int numVersions) throws IOError {
+ try {
+ HTable table = getTable(tableName);
+ Get get = new Get(getBytes(row));
+ get.addColumn(family, qualifier);
+ get.setMaxVersions(numVersions);
+ Result result = table.get(get);
+ return ThriftUtilities.cellFromHBase(result.raw());
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Deprecated
+ @Override
+ public List<TCell> getVerTs(ByteBuffer tableName,
+ ByteBuffer row,
+ ByteBuffer column,
+ long timestamp,
+ int numVersions) throws IOError {
+ byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
+ if(famAndQf.length == 1) {
+ return getVerTs(tableName, row, famAndQf[0], new byte[0], timestamp,
+ numVersions);
+ }
+ return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp,
+ numVersions);
+ }
+
+ protected List<TCell> getVerTs(ByteBuffer tableName,
+ ByteBuffer row, byte [] family,
+ byte [] qualifier, long timestamp, int numVersions) throws IOError {
+ try {
+ HTable table = getTable(tableName);
+ Get get = new Get(getBytes(row));
+ get.addColumn(family, qualifier);
+ get.setTimeRange(Long.MIN_VALUE, timestamp);
+ get.setMaxVersions(numVersions);
+ Result result = table.get(get);
+ return ThriftUtilities.cellFromHBase(result.raw());
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row)
+ throws IOError {
+ return getRowWithColumnsTs(tableName, row, null,
+ HConstants.LATEST_TIMESTAMP);
+ }
+
+ @Override
+ public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
+ ByteBuffer row,
+ List<ByteBuffer> columns) throws IOError {
+ return getRowWithColumnsTs(tableName, row, columns,
+ HConstants.LATEST_TIMESTAMP);
+ }
+
+ @Override
+ public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
+ long timestamp) throws IOError {
+ return getRowWithColumnsTs(tableName, row, null,
+ timestamp);
+ }
+
+ @Override
+ public List<TRowResult> getRowWithColumnsTs(
+ ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
+ long timestamp) throws IOError {
+ try {
+ HTable table = getTable(tableName);
+ if (columns == null) {
+ Get get = new Get(getBytes(row));
+ get.setTimeRange(Long.MIN_VALUE, timestamp);
+ Result result = table.get(get);
+ return ThriftUtilities.rowResultFromHBase(result);
+ }
+ Get get = new Get(getBytes(row));
+ for(ByteBuffer column : columns) {
+ byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
+ if (famAndQf.length == 1) {
+ get.addFamily(famAndQf[0]);
+ } else {
+ get.addColumn(famAndQf[0], famAndQf[1]);
+ }
+ }
+ get.setTimeRange(Long.MIN_VALUE, timestamp);
+ Result result = table.get(get);
+ return ThriftUtilities.rowResultFromHBase(result);
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public List<TRowResult> getRows(ByteBuffer tableName,
+ List<ByteBuffer> rows)
+ throws IOError {
+ return getRowsWithColumnsTs(tableName, rows, null,
+ HConstants.LATEST_TIMESTAMP);
+ }
+
+ @Override
+ public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
+ List<ByteBuffer> rows,
+ List<ByteBuffer> columns) throws IOError {
+ return getRowsWithColumnsTs(tableName, rows, columns,
+ HConstants.LATEST_TIMESTAMP);
+ }
+
+ @Override
+ public List<TRowResult> getRowsTs(ByteBuffer tableName,
+ List<ByteBuffer> rows,
+ long timestamp) throws IOError {
+ return getRowsWithColumnsTs(tableName, rows, null,
+ timestamp);
+ }
+
+ @Override
+ public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
+ List<ByteBuffer> rows,
+ List<ByteBuffer> columns, long timestamp) throws IOError {
+ try {
+ List<Get> gets = new ArrayList<Get>(rows.size());
+ HTable table = getTable(tableName);
+ for (ByteBuffer row : rows) {
+ Get get = new Get(getBytes(row));
+ if (columns != null) {
+
+ for(ByteBuffer column : columns) {
+ byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
+ if (famAndQf.length == 1) {
+ get.addFamily(famAndQf[0]);
+ } else {
+ get.addColumn(famAndQf[0], famAndQf[1]);
+ }
+ }
+ get.setTimeRange(Long.MIN_VALUE, timestamp);
+ }
+ gets.add(get);
+ }
+ Result[] result = table.get(gets);
+ return ThriftUtilities.rowResultFromHBase(result);
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public void deleteAll(
+ ByteBuffer tableName, ByteBuffer row, ByteBuffer column)
+ throws IOError {
+ deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP);
+ }
+
+ @Override
+ public void deleteAllTs(ByteBuffer tableName,
+ ByteBuffer row,
+ ByteBuffer column,
+ long timestamp) throws IOError {
+ try {
+ HTable table = getTable(tableName);
+ Delete delete = new Delete(getBytes(row));
+ byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
+ if (famAndQf.length == 1) {
+ delete.deleteFamily(famAndQf[0], timestamp);
+ } else {
+ delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
+ }
+ table.delete(delete);
+
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public void deleteAllRow(
+ ByteBuffer tableName, ByteBuffer row) throws IOError {
+ deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP);
+ }
+
+ @Override
+ public void deleteAllRowTs(
+ ByteBuffer tableName, ByteBuffer row, long timestamp) throws IOError {
+ try {
+ HTable table = getTable(tableName);
+ Delete delete = new Delete(getBytes(row), timestamp, null);
+ table.delete(delete);
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public void createTable(ByteBuffer in_tableName,
+ List<ColumnDescriptor> columnFamilies) throws IOError,
+ IllegalArgument, AlreadyExists {
+ byte [] tableName = getBytes(in_tableName);
+ try {
+ if (admin.tableExists(tableName)) {
+ throw new AlreadyExists("table name already in use");
+ }
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ for (ColumnDescriptor col : columnFamilies) {
+ HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
+ desc.addFamily(colDesc);
+ }
+ admin.createTable(desc);
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgument(e.getMessage());
+ }
+ }
+
+ @Override
+ public void deleteTable(ByteBuffer in_tableName) throws IOError {
+ byte [] tableName = getBytes(in_tableName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("deleteTable: table=" + Bytes.toString(tableName));
+ }
+ try {
+ if (!admin.tableExists(tableName)) {
+ throw new IOError("table does not exist");
+ }
+ admin.deleteTable(tableName);
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public void mutateRow(ByteBuffer tableName, ByteBuffer row,
+ List<Mutation> mutations) throws IOError, IllegalArgument {
+ mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP);
+ }
+
+ @Override
+ public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
+ List<Mutation> mutations, long timestamp)
+ throws IOError, IllegalArgument {
+ HTable table = null;
+ try {
+ table = getTable(tableName);
+ Put put = new Put(getBytes(row), timestamp, null);
+
+ Delete delete = new Delete(getBytes(row));
+
+ // I apologize for all this mess :)
+ for (Mutation m : mutations) {
+ byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
+ if (m.isDelete) {
+ if (famAndQf.length == 1) {
+ delete.deleteFamily(famAndQf[0], timestamp);
+ } else {
+ delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
+ }
+ } else {
+ if(famAndQf.length == 1) {
+ put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY,
+ m.value != null ? m.value.array()
+ : HConstants.EMPTY_BYTE_ARRAY);
+ } else {
+ put.add(famAndQf[0], famAndQf[1],
+ m.value != null ? m.value.array()
+ : HConstants.EMPTY_BYTE_ARRAY);
+ }
+ }
+ }
+ if (!delete.isEmpty())
+ table.delete(delete);
+ if (!put.isEmpty())
+ table.put(put);
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgument(e.getMessage());
+ }
+ }
+
+ @Override
+ public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches)
+ throws IOError, IllegalArgument, TException {
+ mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP);
+ }
+
+ @Override
+ public void mutateRowsTs(
+ ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp)
+ throws IOError, IllegalArgument, TException {
+ List<Put> puts = new ArrayList<Put>();
+ List<Delete> deletes = new ArrayList<Delete>();
+
+ for (BatchMutation batch : rowBatches) {
+ byte[] row = getBytes(batch.row);
+ List<Mutation> mutations = batch.mutations;
+ Delete delete = new Delete(row);
+ Put put = new Put(row, timestamp, null);
+ for (Mutation m : mutations) {
+ byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
+ if (m.isDelete) {
+ // no qualifier, family only.
+ if (famAndQf.length == 1) {
+ delete.deleteFamily(famAndQf[0], timestamp);
+ } else {
+ delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
+ }
+ } else {
+ if(famAndQf.length == 1) {
+ put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY,
+ m.value != null ? m.value.array()
+ : HConstants.EMPTY_BYTE_ARRAY);
+ } else {
+ put.add(famAndQf[0], famAndQf[1],
+ m.value != null ? m.value.array()
+ : HConstants.EMPTY_BYTE_ARRAY);
+ }
+ }
+ }
+ if (!delete.isEmpty())
+ deletes.add(delete);
+ if (!put.isEmpty())
+ puts.add(put);
+ }
+
+ HTable table = null;
+ try {
+ table = getTable(tableName);
+ if (!puts.isEmpty())
+ table.put(puts);
+ for (Delete del : deletes) {
+ table.delete(del);
+ }
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgument(e.getMessage());
+ }
+ }
+
+ @Deprecated
+ @Override
+ public long atomicIncrement(
+ ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
+ throws IOError, IllegalArgument, TException {
+ byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
+ if(famAndQf.length == 1) {
+ return atomicIncrement(tableName, row, famAndQf[0], new byte[0],
+ amount);
+ }
+ return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
+ }
+
+ protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
+ byte [] family, byte [] qualifier, long amount)
+ throws IOError, IllegalArgument, TException {
+ HTable table;
+ try {
+ table = getTable(tableName);
+ return table.incrementColumnValue(
+ getBytes(row), family, qualifier, amount);
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ public void scannerClose(int id) throws IOError, IllegalArgument {
+ LOG.debug("scannerClose: id=" + id);
+ ResultScanner scanner = getScanner(id);
+ if (scanner == null) {
+ throw new IllegalArgument("scanner ID is invalid");
+ }
+ scanner.close();
+ removeScanner(id);
+ }
+
+ @Override
+ public List<TRowResult> scannerGetList(int id,int nbRows)
+ throws IllegalArgument, IOError {
+ LOG.debug("scannerGetList: id=" + id);
+ ResultScanner scanner = getScanner(id);
+ if (null == scanner) {
+ throw new IllegalArgument("scanner ID is invalid");
+ }
+
+ Result [] results = null;
+ try {
+ results = scanner.next(nbRows);
+ if (null == results) {
+ return new ArrayList<TRowResult>();
+ }
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ return ThriftUtilities.rowResultFromHBase(results);
+ }
+
+ @Override
+ public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
+ return scannerGetList(id,1);
+ }
+
+ public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan)
+ throws IOError {
+ try {
+ HTable table = getTable(tableName);
+ Scan scan = new Scan();
+ if (tScan.isSetStartRow()) {
+ scan.setStartRow(tScan.getStartRow());
+ }
+ if (tScan.isSetStopRow()) {
+ scan.setStopRow(tScan.getStopRow());
+ }
+ if (tScan.isSetTimestamp()) {
+ scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp());
+ }
+ if (tScan.isSetCaching()) {
+ scan.setCaching(tScan.getCaching());
+ }
+ if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
+ for(ByteBuffer column : tScan.getColumns()) {
+ byte [][] famQf = KeyValue.parseColumn(getBytes(column));
+ if(famQf.length == 1) {
+ scan.addFamily(famQf[0]);
+ } else {
+ scan.addColumn(famQf[0], famQf[1]);
+ }
+ }
+ }
+ if (tScan.isSetFilterString()) {
+ ParseFilter parseFilter = new ParseFilter();
+ scan.setFilter(
+ parseFilter.parseFilterString(tScan.getFilterString()));
+ }
+ return addScanner(table.getScanner(scan));
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
+ List<ByteBuffer> columns) throws IOError {
+ try {
+ HTable table = getTable(tableName);
+ Scan scan = new Scan(getBytes(startRow));
+ if(columns != null && columns.size() != 0) {
+ for(ByteBuffer column : columns) {
+ byte [][] famQf = KeyValue.parseColumn(getBytes(column));
+ if(famQf.length == 1) {
+ scan.addFamily(famQf[0]);
+ } else {
+ scan.addColumn(famQf[0], famQf[1]);
+ }
+ }
+ }
+ return addScanner(table.getScanner(scan));
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
+ ByteBuffer stopRow, List<ByteBuffer> columns)
+ throws IOError, TException {
+ try {
+ HTable table = getTable(tableName);
+ Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
+ if(columns != null && columns.size() != 0) {
+ for(ByteBuffer column : columns) {
+ byte [][] famQf = KeyValue.parseColumn(getBytes(column));
+ if(famQf.length == 1) {
+ scan.addFamily(famQf[0]);
+ } else {
+ scan.addColumn(famQf[0], famQf[1]);
+ }
+ }
+ }
+ return addScanner(table.getScanner(scan));
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public int scannerOpenWithPrefix(ByteBuffer tableName,
+ ByteBuffer startAndPrefix,
+ List<ByteBuffer> columns)
+ throws IOError, TException {
+ try {
+ HTable table = getTable(tableName);
+ Scan scan = new Scan(getBytes(startAndPrefix));
+ Filter f = new WhileMatchFilter(
+ new PrefixFilter(getBytes(startAndPrefix)));
+ scan.setFilter(f);
+ if (columns != null && columns.size() != 0) {
+ for(ByteBuffer column : columns) {
+ byte [][] famQf = KeyValue.parseColumn(getBytes(column));
+ if(famQf.length == 1) {
+ scan.addFamily(famQf[0]);
+ } else {
+ scan.addColumn(famQf[0], famQf[1]);
+ }
+ }
+ }
+ return addScanner(table.getScanner(scan));
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
+ List<ByteBuffer> columns, long timestamp) throws IOError, TException {
+ try {
+ HTable table = getTable(tableName);
+ Scan scan = new Scan(getBytes(startRow));
+ scan.setTimeRange(Long.MIN_VALUE, timestamp);
+ if (columns != null && columns.size() != 0) {
+ for (ByteBuffer column : columns) {
+ byte [][] famQf = KeyValue.parseColumn(getBytes(column));
+ if(famQf.length == 1) {
+ scan.addFamily(famQf[0]);
+ } else {
+ scan.addColumn(famQf[0], famQf[1]);
+ }
+ }
+ }
+ return addScanner(table.getScanner(scan));
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
+ ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp)
+ throws IOError, TException {
+ try {
+ HTable table = getTable(tableName);
+ Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
+ scan.setTimeRange(Long.MIN_VALUE, timestamp);
+ if (columns != null && columns.size() != 0) {
+ for (ByteBuffer column : columns) {
+ byte [][] famQf = KeyValue.parseColumn(getBytes(column));
+ if(famQf.length == 1) {
+ scan.addFamily(famQf[0]);
+ } else {
+ scan.addColumn(famQf[0], famQf[1]);
+ }
+ }
+ }
+ scan.setTimeRange(Long.MIN_VALUE, timestamp);
+ return addScanner(table.getScanner(scan));
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
+ ByteBuffer tableName) throws IOError, TException {
+ try {
+ TreeMap<ByteBuffer, ColumnDescriptor> columns =
+ new TreeMap<ByteBuffer, ColumnDescriptor>();
+
+ HTable table = getTable(tableName);
+ HTableDescriptor desc = table.getTableDescriptor();
+
+ for (HColumnDescriptor e : desc.getFamilies()) {
+ ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
+ columns.put(col.name, col);
+ }
+ return columns;
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
+ ByteBuffer family) throws IOError {
+ try {
+ HTable table = getTable(getBytes(tableName));
+ Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
+ return ThriftUtilities.cellFromHBase(result.raw());
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ @Override
+ public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
+ try {
+ HTable table = getTable(HConstants.META_TABLE_NAME);
+ Result startRowResult = table.getRowOrBefore(
+ searchRow.array(), HConstants.CATALOG_FAMILY);
+
+ if (startRowResult == null) {
+ throw new IOException("Cannot find row in .META., row="
+ + Bytes.toString(searchRow.array()));
+ }
+
+ // find region start and end keys
+ byte[] value = startRowResult.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.REGIONINFO_QUALIFIER);
+ if (value == null || value.length == 0) {
+ throw new IOException("HRegionInfo REGIONINFO was null or " +
+ " empty in Meta for row="
+ + Bytes.toString(searchRow.array()));
+ }
+ HRegionInfo regionInfo = Writables.getHRegionInfo(value);
+ TRegionInfo region = new TRegionInfo();
+ region.setStartKey(regionInfo.getStartKey());
+ region.setEndKey(regionInfo.getEndKey());
+ region.id = regionInfo.getRegionId();
+ region.setName(regionInfo.getRegionName());
+ region.version = regionInfo.getVersion();
+
+ // find region assignment to server
+ value = startRowResult.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.SERVER_QUALIFIER);
+ if (value != null && value.length > 0) {
+ String hostAndPort = Bytes.toString(value);
+ region.setServerName(Bytes.toBytes(
+ Addressing.parseHostname(hostAndPort)));
+ region.port = Addressing.parsePort(hostAndPort);
+ }
+ return region;
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+ }
+}