You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 19:44:10 UTC
svn commit: r1181955 [1/3] - in /hbase/branches/0.89/src/main:
java/org/apache/hadoop/hbase/regionserver/
java/org/apache/hadoop/hbase/thrift/
java/org/apache/hadoop/hbase/thrift/generated/
resources/org/apache/hadoop/hbase/thrift/
Author: nspiegelberg
Date: Tue Oct 11 17:44:09 2011
New Revision: 1181955
URL: http://svn.apache.org/viewvc?rev=1181955&view=rev
Log:
Commiting dgoode's patch: Wormhole Support in HBase89 Thrift
Summary:Added the single-hop regionserver from 90 to 89 and backported the necessary thrift calls. Regenerated the thrift files. Diff is a commit above D296169 although it can be separated, with the exception of thrift generation which must be redone.
Added forgotten shutdown, put after master-startup-wait to avoid errors.
Reviewed By:kranganathan
Test Plan:Chip will run his wormhole unit tests. I will also attempt to test some of these functionalities from WWW on my dev cluster.
Per Chip: it works! test passes!
Revert Plan:OK
- end platform impact -
Added:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
Modified:
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/thrift/generated/Hbase.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/thrift/generated/TRegionInfo.java
hbase/branches/0.89/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1181955&r1=1181954&r2=1181955&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Oct 11 17:44:09 2011
@@ -263,6 +263,9 @@ public class HRegionServer implements HR
private final AtomicLong globalMemstoreSize = new AtomicLong(0);
+ // reference to the Thrift Server.
+ volatile private HRegionThriftServer thriftServer;
+
/**
* Starts a HRegionServer at the default location
* @param conf
@@ -589,6 +592,10 @@ public class HRegionServer implements HR
abort("Unhandled exception", t);
}
}
+ // shutdown thriftserver
+ if (thriftServer != null) {
+ thriftServer.shutdown();
+ }
this.leases.closeAfterLeasesExpire();
this.worker.stop();
this.server.stop();
@@ -764,6 +771,14 @@ public class HRegionServer implements HR
this.dynamicMetrics = RegionServerDynamicMetrics.newInstance();
startServiceThreads();
isOnline = true;
+
+ // Create the thread for the ThriftServer.
+ // NOTE this defaults to FALSE so you have to enable it in conf
+ if (conf.getBoolean("hbase.regionserver.export.thrift", false)) {
+ thriftServer = new HRegionThriftServer(this, conf);
+ thriftServer.start();
+ LOG.info("Started Thrift API from Region Server.");
+ }
} catch (Throwable e) {
this.isOnline = false;
this.stopRequested.set(true);
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java?rev=1181955&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java Tue Oct 11 17:44:09 2011
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
+import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
+import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
+import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.hbase.thrift.generated.IOError;
+import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
+import org.apache.hadoop.hbase.thrift.generated.Mutation;
+import org.apache.hadoop.hbase.thrift.generated.TCell;
+import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
+import org.apache.hadoop.hbase.thrift.generated.TRowResult;
+import org.apache.hadoop.hbase.thrift.ThriftServer;
+import org.apache.hadoop.hbase.thrift.ThriftServer.HBaseHandler;
+import org.apache.hadoop.hbase.thrift.ThriftUtilities;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportFactory;
+
+/**
+ * ThriftServer - this class starts up a Thrift server in the same
+ * JVM where the RegionServer is running. It inherits most of the
+ * functionality from the standard ThriftServer. This is good because
+ * we can maintain compatibility with applications that use the
+ * standard Thrift interface. For performance reasons, we override
+ * methods to directly invoke calls into the HRegionServer.
+ */
+public class HRegionThriftServer extends Thread {
+
+ public static final Log LOG = LogFactory.getLog(HRegionThriftServer.class);
+ public static final int DEFAULT_LISTEN_PORT = 9091;
+
+ private HRegionServer rs;
+ private Configuration conf;
+
+ private int port;
+ private boolean nonblocking;
+ private String bindIpAddress;
+ private String transport;
+ private String protocol;
+ volatile private TServer tserver;
+ private boolean redirect; // redirect to appropriate Regionserver
+
+ /**
+ * Create an instance of the glue object that connects the
+ * RegionServer with the standard ThriftServer implementation
+ */
+ HRegionThriftServer(HRegionServer regionServer, Configuration conf) {
+ this.rs = regionServer;
+ this.conf = conf;
+ }
+
+ /**
+ * Inherit the Handler from the standard ThriftServer. This allows us
+ * to use the default implementation for most calls. We override certain calls
+ * for performance reasons
+ */
+ private class HBaseHandlerRegion extends ThriftServer.HBaseHandler {
+
+ HBaseHandlerRegion(final Configuration conf) throws IOException {
+ super(conf);
+ initialize(conf);
+ }
+
+ /**
+ * Do increments. Shortcircuit to get better performance.
+ */
+ @Override
+ public long atomicIncrement(byte[] tableName, byte [] row, byte [] family,
+ byte [] qualifier, long amount)
+ throws IOError, IllegalArgument, TException {
+ try {
+ HTable table = getTable(tableName);
+ HRegionLocation location = table.getRegionLocation(row);
+ byte[] regionName = location.getRegionInfo().getRegionName();
+
+ return rs.incrementColumnValue(regionName, row, family,
+ qualifier, amount, true);
+ } catch (NotServingRegionException e) {
+ if (!redirect) {
+ throw new IOError(e.getMessage());
+ }
+ LOG.info("ThriftServer redirecting atomicIncrement");
+ return super.atomicIncrement(tableName, row, family,
+ qualifier, amount);
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ /**
+ * Get a record. Shortcircuit to get better performance.
+ */
+ @Override
+ public List<TRowResult> getRowWithColumnsTs(byte[] tableName, byte[] row,
+ List<byte[]> columns,
+ long timestamp)
+ throws IOError {
+ try {
+ HTable table = getTable(tableName);
+ HRegionLocation location = table.getRegionLocation(row);
+ byte[] regionName = location.getRegionInfo().getRegionName();
+
+ if (columns == null) {
+ Get get = new Get(row);
+ get.setTimeRange(Long.MIN_VALUE, timestamp);
+ Result result = rs.get(regionName, get);
+ return ThriftUtilities.rowResultFromHBase(result);
+ }
+ byte[][] columnArr = columns.toArray(new byte[columns.size()][]);
+ Get get = new Get(row);
+ for (byte[] column : columnArr) {
+ byte[][] famAndQf = KeyValue.parseColumn(column);
+ if (famAndQf.length == 1) {
+ get.addFamily(famAndQf[0]);
+ } else {
+ get.addColumn(famAndQf[0], famAndQf[1]);
+ }
+ }
+ get.setTimeRange(Long.MIN_VALUE, timestamp);
+ Result result = rs.get(regionName, get);
+ return ThriftUtilities.rowResultFromHBase(result);
+ } catch (NotServingRegionException e) {
+ if (!redirect) {
+ throw new IOError(e.getMessage());
+ }
+ LOG.info("ThriftServer redirecting getRowWithColumnsTs");
+ return super.getRowWithColumnsTs(tableName, row, columns, timestamp);
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+ @Override
+ public List<TRowResult> getRowWithColumnPrefix(byte[] tableName,
+ byte[] row, byte[] prefix) throws IOError {
+ return (getRowWithColumnPrefixTs(tableName, row, prefix,
+ HConstants.LATEST_TIMESTAMP));
+ }
+
+ @Override
+ public List<TRowResult> getRowWithColumnPrefixTs(byte[] tableName,
+ byte[] row, byte[] prefix, long timestamp) throws IOError {
+ try {
+ HTable table = getTable(tableName);
+ if (prefix == null) {
+ Get get = new Get(row);
+ get.setTimeRange(Long.MIN_VALUE, timestamp);
+ Result result = table.get(get);
+ return ThriftUtilities.rowResultFromHBase(result);
+ }
+ Get get = new Get(row);
+ byte[][] famAndPrefix = KeyValue.parseColumn(prefix);
+ if (famAndPrefix.length == 2) {
+ get.addFamily(famAndPrefix[0]);
+ get.setFilter(new ColumnPrefixFilter(famAndPrefix[1]));
+ } else {
+ get.setFilter(new ColumnPrefixFilter(famAndPrefix[0]));
+ }
+ get.setTimeRange(Long.MIN_VALUE, timestamp);
+ Result result = table.get(get);
+ return ThriftUtilities.rowResultFromHBase(result);
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
+ }
+
+ /**
+ * Read and initialize config parameters
+ */
+ private void initialize(Configuration conf) {
+ this.port = conf.getInt("hbase.regionserver.thrift.port",
+ DEFAULT_LISTEN_PORT);
+ this.bindIpAddress = conf.get("hbase.regionserver.thrift.ipaddress");
+ this.protocol = conf.get("hbase.regionserver.thrift.protocol");
+ this.transport = conf.get("hbase.regionserver.thrift.transport");
+ this.nonblocking = conf.getBoolean("hbase.regionserver.thrift.nonblocking",
+ false);
+ this.redirect = conf.getBoolean("hbase.regionserver.thrift.redirect",
+ false);
+ }
+
+ /**
+ * Stop ThriftServer
+ */
+ void shutdown() {
+ if (tserver != null) {
+ tserver.stop();
+ tserver = null;
+ }
+ }
+
+ public void run() {
+ try {
+ HBaseHandlerRegion handler = new HBaseHandlerRegion(this.conf);
+ Hbase.Processor processor = new Hbase.Processor(handler);
+
+ TProtocolFactory protocolFactory;
+ if (this.protocol != null && this.protocol.equals("compact")) {
+ protocolFactory = new TCompactProtocol.Factory();
+ } else {
+ protocolFactory = new TBinaryProtocol.Factory();
+ }
+
+ if (this.nonblocking) {
+ LOG.info("starting HRegionServer Nonblocking Thrift server on " +
+ this.port);
+ LOG.info("HRegionServer Nonblocking Thrift server does not " +
+ "support address binding.");
+ TNonblockingServerTransport serverTransport =
+ new TNonblockingServerSocket(this.port);
+ TFramedTransport.Factory transportFactory =
+ new TFramedTransport.Factory();
+
+ tserver = new TNonblockingServer(processor, serverTransport,
+ transportFactory, protocolFactory);
+ } else {
+ InetAddress listenAddress = null;
+ if (this.bindIpAddress != null) {
+ listenAddress = InetAddress.getByName(this.bindIpAddress);
+ } else {
+ listenAddress = InetAddress.getLocalHost();
+ }
+ TServerTransport serverTransport = new TServerSocket(
+ new InetSocketAddress(listenAddress, port));
+
+ TTransportFactory transportFactory;
+ if (this.transport != null && this.transport.equals("framed")) {
+ transportFactory = new TFramedTransport.Factory();
+ } else {
+ transportFactory = new TTransportFactory();
+ }
+ LOG.info("starting HRegionServer ThreadPool Thrift server on " +
+ listenAddress + ":" + this.port);
+ tserver = new TThreadPoolServer(processor, serverTransport,
+ transportFactory, protocolFactory);
+ }
+ tserver.serve();
+ } catch (Exception e) {
+ LOG.warn("Unable to start HRegionServerThrift interface.", e);
+ }
+ }
+}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java?rev=1181955&r1=1181954&r2=1181955&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java Tue Oct 11 17:44:09 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.thrift.ge
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
import org.apache.hadoop.hbase.thrift.generated.TScan;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
@@ -186,10 +188,14 @@ public class ThriftServer {
/**
* Constructs an HBaseHandler object.
*
- * @throws MasterNotRunningException
+ * @throws IOException
*/
- HBaseHandler() throws MasterNotRunningException {
- conf = HBaseConfiguration.create();
+ protected HBaseHandler() throws IOException {
+ this(HBaseConfiguration.create());
+ }
+
+ protected HBaseHandler(final Configuration c) throws IOException {
+ this.conf = c;
admin = new HBaseAdmin(conf);
scannerMap = new HashMap<Integer, ResultScanner>();
}
@@ -261,6 +267,12 @@ public class ThriftServer {
region.id = regionInfo.getRegionId();
region.name = regionInfo.getRegionName();
region.version = regionInfo.getVersion();
+ HServerAddress server = regionsInfo.get(regionInfo);
+ if (server != null) {
+ byte[] hostname = Bytes.toBytes(server.getHostname());
+ region.serverName = hostname;
+ region.port = server.getPort();
+ }
regions.add(region);
}
return regions;
@@ -393,6 +405,40 @@ public class ThriftServer {
}
}
+ @Override
+ public List<TRowResult> getRowWithColumnPrefix(byte[] tableName,
+ byte[] row, byte[] prefix) throws IOError {
+ return (getRowWithColumnPrefixTs(tableName, row, prefix,
+ HConstants.LATEST_TIMESTAMP));
+ }
+
+ @Override
+ public List<TRowResult> getRowWithColumnPrefixTs(byte[] tableName,
+ byte[] row, byte[] prefix, long timestamp) throws IOError {
+ try {
+ HTable table = getTable(tableName);
+ if (prefix == null) {
+ Get get = new Get(row);
+ get.setTimeRange(Long.MIN_VALUE, timestamp);
+ Result result = table.get(get);
+ return ThriftUtilities.rowResultFromHBase(result);
+ }
+ Get get = new Get(row);
+ byte [][] famAndPrefix = KeyValue.parseColumn(prefix);
+ if (famAndPrefix.length == 2) {
+ get.addFamily(famAndPrefix[0]);
+ get.setFilter(new ColumnPrefixFilter(famAndPrefix[1]));
+ } else {
+ get.setFilter(new ColumnPrefixFilter(famAndPrefix[0]));
+ }
+ get.setTimeRange(Long.MIN_VALUE, timestamp);
+ Result result = table.get(get);
+ return ThriftUtilities.rowResultFromHBase(result);
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
public void deleteAll(byte[] tableName, byte[] row, byte[] column)
throws IOError {
deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP);
@@ -944,6 +990,51 @@ public class ThriftServer {
throw new IOError(e.getMessage());
}
}
+
+ @Override
+ public TRegionInfo getRegionInfo(byte[] searchRow) throws IOError,
+ TException {
+ try {
+ HTable table = getTable(HConstants.META_TABLE_NAME);
+ Result startRowResult = table.getRowOrBefore(searchRow,
+ HConstants.CATALOG_FAMILY);
+
+ if (startRowResult == null) {
+ throw new IOException("Cannot find row in .META., row="
+ + Bytes.toString(searchRow));
+ }
+
+ // find region start and end keys
+ byte[] value = startRowResult.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.REGIONINFO_QUALIFIER);
+ if (value == null || value.length == 0) {
+ throw new IOException("HRegionInfo REGIONINFO was null or "
+ + " empty in Meta for row=" + Bytes.toString(searchRow));
+ }
+ HRegionInfo regionInfo = Writables.getHRegionInfo(value);
+ TRegionInfo region = new TRegionInfo();
+ region.setStartKey(regionInfo.getStartKey());
+ region.setEndKey(regionInfo.getEndKey());
+ region.id = regionInfo.getRegionId();
+ region.setName(regionInfo.getRegionName());
+ region.version = regionInfo.getVersion();
+
+ // find region assignment to server
+ value = startRowResult.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.SERVER_QUALIFIER);
+ if (value != null && value.length > 0) {
+ String address = Bytes.toString(value);
+ HServerAddress server = new HServerAddress(address);
+ byte[] hostname = Bytes.toBytes(server.getHostname());
+ region.serverName = hostname;
+ region.port = server.getPort();
+ }
+ return region;
+ } catch (IOException e) {
+ throw new IOError(e.getMessage());
+ }
+ }
+
}
//