You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/04/13 22:28:22 UTC
svn commit: r1325937 [1/7] - in /hbase/trunk/src:
main/java/org/apache/hadoop/hbase/catalog/
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/filter/
main/java/org/apache/hadoop/hbase/io/
main/java/org/apache/hadoop/hbase/ipc...
Author: stack
Date: Fri Apr 13 20:28:21 2012
New Revision: 1325937
URL: http://svn.apache.org/viewvc?rev=1325937&view=rev
Log:
HBASE-5443 Convert the client protocol of HRegionInterface to PB
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/AdminProtocol.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ClientProtocol.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServer.java
hbase/trunk/src/main/protobuf/Admin.proto
hbase/trunk/src/main/protobuf/Client.proto
Removed:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RegionAdminProtos.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RegionClientProtos.java
hbase/trunk/src/main/protobuf/RegionAdmin.proto
hbase/trunk/src/main/protobuf/RegionClient.proto
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java
hbase/trunk/src/main/protobuf/hbase.proto
hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHbaseObjectWritable.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/OOMERegionServer.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java Fri Apr 13 20:28:21 2012
@@ -31,18 +31,15 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.ipc.RemoteException;
/**
* Reads region and assignment information from <code>.META.</code>.
@@ -253,19 +250,6 @@ public class MetaReader {
}
/**
- * Reads the location of META from ROOT.
- * @param metaServer connection to server hosting ROOT
- * @return location of META in ROOT where location, or null if not available
- * @throws IOException
- * @deprecated Does not retry; use #getMetaRegionLocation(CatalogTracker)
- */
- public static ServerName readMetaLocation(HRegionInterface metaServer)
- throws IOException {
- return readLocation(metaServer, CatalogTracker.ROOT_REGION_NAME,
- CatalogTracker.META_REGION_NAME);
- }
-
- /**
* Gets the location of <code>.META.</code> region by reading content of
* <code>-ROOT-</code>.
* @param ct
@@ -292,50 +276,6 @@ public class MetaReader {
return (pair == null || pair.getSecond() == null)? null: pair.getSecond();
}
- // TODO: Remove when deprecated dependencies are removed.
- private static ServerName readLocation(HRegionInterface metaServer,
- byte [] catalogRegionName, byte [] regionName)
- throws IOException {
- Result r = null;
- try {
- r = metaServer.get(catalogRegionName,
- new Get(regionName).
- addColumn(HConstants.CATALOG_FAMILY,
- HConstants.SERVER_QUALIFIER).
- addColumn(HConstants.CATALOG_FAMILY,
- HConstants.STARTCODE_QUALIFIER));
- } catch (java.net.SocketTimeoutException e) {
- // Treat this exception + message as unavailable catalog table. Catch it
- // and fall through to return a null
- } catch (java.net.SocketException e) {
- // Treat this exception + message as unavailable catalog table. Catch it
- // and fall through to return a null
- } catch (RemoteException re) {
- IOException ioe = re.unwrapRemoteException();
- if (ioe instanceof NotServingRegionException) {
- // Treat this NSRE as unavailable table. Catch and fall through to
- // return null below
- } else if (ioe.getMessage().contains("Server not running")) {
- // Treat as unavailable table.
- } else {
- throw re;
- }
- } catch (IOException e) {
- if (e.getCause() != null && e.getCause() instanceof IOException &&
- e.getCause().getMessage() != null &&
- e.getCause().getMessage().contains("Connection reset by peer")) {
- // Treat this exception + message as unavailable catalog table. Catch it
- // and fall through to return a null
- } else {
- throw e;
- }
- }
- if (r == null || r.isEmpty()) {
- return null;
- }
- return getServerNameFromCatalogResult(r);
- }
-
/**
* Gets the region info and assignment for the specified region.
* @param catalogTracker
@@ -655,35 +595,6 @@ public class MetaReader {
}
/**
- * Fully scan a given region, on a given server starting with given row.
- * @param hRegionInterface region server
- * @param visitor visitor
- * @param regionName name of region
- * @param startrow start row
- * @throws IOException
- * @deprecated Does not retry; use fullScan xxx instead.
- x
- */
- public static void fullScan(HRegionInterface hRegionInterface,
- Visitor visitor, final byte[] regionName,
- byte[] startrow) throws IOException {
- if (hRegionInterface == null) return;
- Scan scan = new Scan();
- if (startrow != null) scan.setStartRow(startrow);
- scan.addFamily(HConstants.CATALOG_FAMILY);
- long scannerid = hRegionInterface.openScanner(regionName, scan);
- try {
- Result data;
- while((data = hRegionInterface.next(scannerid)) != null) {
- if (!data.isEmpty()) visitor.visit(data);
- }
- } finally {
- hRegionInterface.close(scannerid);
- }
- return;
- }
-
- /**
* Performs a full scan of a catalog table.
* @param catalogTracker
* @param visitor Visitor invoked against each row.
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java Fri Apr 13 20:28:21 2012
@@ -307,6 +307,7 @@ public class ClientScanner extends Abstr
}
// Clear region
this.currentRegion = null;
+ callable = null;
continue;
}
long currentTime = System.currentTimeMillis();
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Fri Apr 13 20:28:21 2012
@@ -19,6 +19,17 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.SocketTimeoutException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -45,6 +56,12 @@ import org.apache.hadoop.hbase.catalog.M
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
@@ -54,16 +71,7 @@ import org.apache.hadoop.ipc.RemoteExcep
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.SocketTimeoutException;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.regex.Pattern;
+import com.google.protobuf.ServiceException;
/**
* Provides an interface to manage HBase database table metadata + general
@@ -497,23 +505,27 @@ public class HBaseAdmin implements Abort
});
// Wait until all regions deleted
- HRegionInterface server =
- connection.getHRegionConnection(firstMetaServer.getHostname(), firstMetaServer.getPort());
+ ClientProtocol server =
+ connection.getClient(firstMetaServer.getHostname(), firstMetaServer.getPort());
for (int tries = 0; tries < (this.numRetries * this.retryLongerMultiplier); tries++) {
- long scannerId = -1L;
try {
Scan scan = MetaReader.getScanForTableName(tableName);
- scan.addColumn(HConstants.CATALOG_FAMILY,
- HConstants.REGIONINFO_QUALIFIER);
- scannerId = server.openScanner(
- firstMetaServer.getRegionInfo().getRegionName(), scan);
+ scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
+ ScanRequest request = RequestConverter.buildScanRequest(
+ firstMetaServer.getRegionInfo().getRegionName(), scan, 1, true);
+ Result[] values = null;
// Get a batch at a time.
- Result values = server.next(scannerId);
+ try {
+ ScanResponse response = server.scan(null, request);
+ values = ResponseConverter.getResults(response);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
// let us wait until .META. table is updated and
// HMaster removes the table from its HTableDescriptors
- if (values == null) {
+ if (values == null || values.length == 0) {
boolean tableExists = false;
HTableDescriptor[] htds;
MasterKeepAliveConnection master = connection.getKeepAliveMaster();
@@ -542,14 +554,6 @@ public class HBaseAdmin implements Abort
throw ex;
}
}
- } finally {
- if (scannerId != -1L) {
- try {
- server.close(scannerId);
- } catch (IOException ex) {
- LOG.warn(ex);
- }
- }
}
try {
Thread.sleep(getPauseTime(tries));
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Fri Apr 13 20:28:21 2012
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.co
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
@@ -221,6 +222,19 @@ public interface HConnection extends Abo
throws IOException;
/**
+ * Establishes a connection to the region server at the specified address, and return
+ * a region client protocol.
+ *
+ * @param hostname RegionServer hostname
+ * @param port RegionServer port
+ * @return ClientProtocol proxy for RegionServer
+ * @throws IOException if a remote or network exception occurs
+ *
+ */
+ public ClientProtocol getClient(final String hostname, final int port)
+ throws IOException;
+
+ /**
* Establishes a connection to the region server at the specified address.
* @param regionServer - the server to connect to
* @param getMaster - do we check if master is alive
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Fri Apr 13 20:28:21 2012
@@ -19,6 +19,33 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -46,6 +73,13 @@ import org.apache.hadoop.hbase.ipc.ExecR
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.VersionedProtocol;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
@@ -61,32 +95,7 @@ import org.apache.hadoop.hbase.zookeeper
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.google.protobuf.ServiceException;
/**
* A non-instantiable class that manages {@link HConnection}s.
@@ -146,6 +155,12 @@ public class HConnectionManager {
public static final int MAX_CACHED_HBASE_INSTANCES;
+ /** Parameter name for what client protocol to use. */
+ public static final String CLIENT_PROTOCOL_CLASS = "hbase.clientprotocol.class";
+
+ /** Default client protocol class name. */
+ public static final String DEFAULT_CLIENT_PROTOCOL_CLASS = ClientProtocol.class.getName();
+
private static final Log LOG = LogFactory.getLog(HConnectionManager.class);
static {
@@ -493,6 +508,7 @@ public class HConnectionManager {
static class HConnectionImplementation implements HConnection, Closeable {
static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
private final Class<? extends HRegionInterface> serverInterfaceClass;
+ private final Class<? extends ClientProtocol> clientClass;
private final long pause;
private final int numRetries;
private final int maxRPCAttempts;
@@ -521,8 +537,8 @@ public class HConnectionManager {
private final Configuration conf;
// Known region HServerAddress.toString() -> HRegionInterface
- private final Map<String, HRegionInterface> servers =
- new ConcurrentHashMap<String, HRegionInterface>();
+ private final ConcurrentHashMap<String, Map<String, VersionedProtocol>> servers =
+ new ConcurrentHashMap<String, Map<String, VersionedProtocol>>();
private final ConcurrentHashMap<String, String> connectionLock =
new ConcurrentHashMap<String, String>();
@@ -570,6 +586,15 @@ public class HConnectionManager {
throw new UnsupportedOperationException(
"Unable to find region server interface " + serverClassName, e);
}
+ String clientClassName = conf.get(CLIENT_PROTOCOL_CLASS,
+ DEFAULT_CLIENT_PROTOCOL_CLASS);
+ try {
+ this.clientClass =
+ (Class<? extends ClientProtocol>) Class.forName(clientClassName);
+ } catch (ClassNotFoundException e) {
+ throw new UnsupportedOperationException(
+ "Unable to find client protocol " + clientClassName, e);
+ }
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
@@ -1329,18 +1354,28 @@ public class HConnectionManager {
}
@Override
+ public ClientProtocol getClient(
+ final String hostname, final int port) throws IOException {
+ return (ClientProtocol)getProtocol(hostname, port,
+ clientClass, ClientProtocol.VERSION);
+ }
+
+ @Override
@Deprecated
public HRegionInterface getHRegionConnection(HServerAddress hsa,
boolean master)
throws IOException {
- return getHRegionConnection(null, -1, hsa.getInetSocketAddress(), master);
+ String hostname = hsa.getInetSocketAddress().getHostName();
+ int port = hsa.getInetSocketAddress().getPort();
+ return getHRegionConnection(hostname, port, master);
}
@Override
public HRegionInterface getHRegionConnection(final String hostname,
final int port, final boolean master)
throws IOException {
- return getHRegionConnection(hostname, port, null, master);
+ return (HRegionInterface)getProtocol(hostname, port,
+ serverInterfaceClass, HRegionInterface.VERSION);
}
/**
@@ -1348,43 +1383,44 @@ public class HConnectionManager {
* can be but not both.
* @param hostname
* @param port
- * @param isa
- * @param master
+ * @param protocolClass
+ * @param version
* @return Proxy.
* @throws IOException
*/
- HRegionInterface getHRegionConnection(final String hostname, final int port,
- final InetSocketAddress isa, final boolean master)
- throws IOException {
- HRegionInterface server;
- String rsName = null;
- if (isa != null) {
- rsName = Addressing.createHostAndPortStr(isa.getHostName(),
- isa.getPort());
- } else {
- rsName = Addressing.createHostAndPortStr(hostname, port);
- }
+ VersionedProtocol getProtocol(final String hostname,
+ final int port, final Class <? extends VersionedProtocol> protocolClass,
+ final long version) throws IOException {
+ String rsName = Addressing.createHostAndPortStr(hostname, port);
// See if we already have a connection (common case)
- server = this.servers.get(rsName);
+ Map<String, VersionedProtocol> protocols = this.servers.get(rsName);
+ if (protocols == null) {
+ protocols = new HashMap<String, VersionedProtocol>();
+ Map<String, VersionedProtocol> existingProtocols =
+ this.servers.putIfAbsent(rsName, protocols);
+ if (existingProtocols != null) {
+ protocols = existingProtocols;
+ }
+ }
+ String protocol = protocolClass.getName();
+ VersionedProtocol server = protocols.get(protocol);
if (server == null) {
- // create a unique lock for this RS (if necessary)
- this.connectionLock.putIfAbsent(rsName, rsName);
+ // create a unique lock for this RS + protocol (if necessary)
+ String lockKey = protocol + "@" + rsName;
+ this.connectionLock.putIfAbsent(lockKey, lockKey);
// get the RS lock
- synchronized (this.connectionLock.get(rsName)) {
+ synchronized (this.connectionLock.get(lockKey)) {
// do one more lookup in case we were stalled above
- server = this.servers.get(rsName);
+ server = protocols.get(protocol);
if (server == null) {
try {
// Only create isa when we need to.
- InetSocketAddress address = isa != null? isa:
- new InetSocketAddress(hostname, port);
+ InetSocketAddress address = new InetSocketAddress(hostname, port);
// definitely a cache miss. establish an RPC for this RS
- server = (HRegionInterface) HBaseRPC.waitForProxy(
- serverInterfaceClass, HRegionInterface.VERSION,
- address, this.conf,
+ server = HBaseRPC.waitForProxy(
+ protocolClass, version, address, this.conf,
this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
- this.servers.put(Addressing.createHostAndPortStr(
- address.getHostName(), address.getPort()), server);
+ protocols.put(protocol, server);
} catch (RemoteException e) {
LOG.warn("RemoteException connecting to RS", e);
// Throw what the RemoteException was carrying.
@@ -1679,11 +1715,42 @@ public class HConnectionManager {
ServerCallable<MultiResponse> callable =
new ServerCallable<MultiResponse>(connection, tableName, null) {
public MultiResponse call() throws IOException {
- return server.multi(multi);
+ try {
+ MultiResponse response = new MultiResponse();
+ for (Map.Entry<byte[], List<Action<R>>> e: multi.actions.entrySet()) {
+ byte[] regionName = e.getKey();
+ int rowMutations = 0;
+ List<Action<R>> actions = e.getValue();
+ for (Action<R> action: actions) {
+ Row row = action.getAction();
+ if (row instanceof RowMutations) {
+ MultiRequest request =
+ RequestConverter.buildMultiRequest(regionName, (RowMutations)row);
+ server.multi(null, request);
+ response.add(regionName, action.getOriginalIndex(), new Result());
+ rowMutations++;
+ }
+ }
+ if (actions.size() > rowMutations) {
+ MultiRequest request =
+ RequestConverter.buildMultiRequest(regionName, actions);
+ ClientProtos.MultiResponse
+ proto = server.multi(null, request);
+ List<Object> results = ResponseConverter.getResults(proto);
+ for (int i = 0, n = results.size(); i < n; i++) {
+ int originalIndex = actions.get(i).getOriginalIndex();
+ response.add(regionName, originalIndex, results.get(i));
+ }
+ }
+ }
+ return response;
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
@Override
public void connect(boolean reload) throws IOException {
- server = connection.getHRegionConnection(
+ server = connection.getClient(
loc.getHostname(), loc.getPort());
}
};
@@ -1885,7 +1952,7 @@ public class HConnectionManager {
}
}
} catch (ExecutionException e) {
- LOG.warn("Failed all from " + loc, e);
+ LOG.debug("Failed all from " + loc, e);
}
}
@@ -2077,8 +2144,10 @@ public class HConnectionManager {
delayedClosing.stop("Closing connection");
if (stopProxy) {
closeMaster();
- for (HRegionInterface i : servers.values()) {
- HBaseRPC.stopProxy(i);
+ for (Map<String, VersionedProtocol> i : servers.values()) {
+ for (VersionedProtocol server: i.values()) {
+ HBaseRPC.stopProxy(server);
+ }
}
}
closeZooKeeperWatcher();
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Fri Apr 13 20:28:21 2012
@@ -53,13 +53,27 @@ import org.apache.hadoop.hbase.ServerNam
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.CompareType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Writables;
+import com.google.protobuf.ServiceException;
+
/**
* <p>Used to communicate with a single HBase table.
*
@@ -648,8 +662,15 @@ public class HTable implements HTableInt
throws IOException {
return new ServerCallable<Result>(connection, tableName, row, operationTimeout) {
public Result call() throws IOException {
- return server.getClosestRowBefore(location.getRegionInfo().getRegionName(),
- row, family);
+ try {
+ GetRequest request = RequestConverter.buildGetRequest(
+ location.getRegionInfo().getRegionName(), row, family, true);
+ GetResponse response = server.get(null, request);
+ if (!response.hasResult()) return null;
+ return ProtobufUtil.toResult(response.getResult());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -694,7 +715,14 @@ public class HTable implements HTableInt
public Result get(final Get get) throws IOException {
return new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) {
public Result call() throws IOException {
- return server.get(location.getRegionInfo().getRegionName(), get);
+ try {
+ GetRequest request = RequestConverter.buildGetRequest(
+ location.getRegionInfo().getRegionName(), get);
+ GetResponse response = server.get(null, request);
+ return ProtobufUtil.toResult(response.getResult());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -746,13 +774,18 @@ public class HTable implements HTableInt
@Override
public void delete(final Delete delete)
throws IOException {
- new ServerCallable<Void>(connection, tableName, delete.getRow(),
- operationTimeout) {
- public Void call() throws IOException {
- server.delete(location.getRegionInfo().getRegionName(), delete);
- return null;
- }
- }.withRetries();
+ new ServerCallable<Boolean>(connection, tableName, delete.getRow(), operationTimeout) {
+ public Boolean call() throws IOException {
+ try {
+ MutateRequest request = RequestConverter.buildMutateRequest(
+ location.getRegionInfo().getRegionName(), delete);
+ MutateResponse response = server.mutate(null, request);
+ return Boolean.valueOf(response.getProcessed());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+ }.withRetries();
}
/**
@@ -821,7 +854,13 @@ public class HTable implements HTableInt
new ServerCallable<Void>(connection, tableName, rm.getRow(),
operationTimeout) {
public Void call() throws IOException {
- server.mutateRow(location.getRegionInfo().getRegionName(), rm);
+ try {
+ MultiRequest request = RequestConverter.buildMultiRequest(
+ location.getRegionInfo().getRegionName(), rm);
+ server.multi(null, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
return null;
}
}.withRetries();
@@ -838,8 +877,15 @@ public class HTable implements HTableInt
}
return new ServerCallable<Result>(connection, tableName, append.getRow(), operationTimeout) {
public Result call() throws IOException {
- return server.append(
+ try {
+ MutateRequest request = RequestConverter.buildMutateRequest(
location.getRegionInfo().getRegionName(), append);
+ MutateResponse response = server.mutate(null, request);
+ if (!response.hasResult()) return null;
+ return ProtobufUtil.toResult(response.getResult());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -855,8 +901,14 @@ public class HTable implements HTableInt
}
return new ServerCallable<Result>(connection, tableName, increment.getRow(), operationTimeout) {
public Result call() throws IOException {
- return server.increment(
+ try {
+ MutateRequest request = RequestConverter.buildMutateRequest(
location.getRegionInfo().getRegionName(), increment);
+ MutateResponse response = server.mutate(null, request);
+ return ProtobufUtil.toResult(response.getResult());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -890,9 +942,16 @@ public class HTable implements HTableInt
}
return new ServerCallable<Long>(connection, tableName, row, operationTimeout) {
public Long call() throws IOException {
- return server.incrementColumnValue(
+ try {
+ MutateRequest request = RequestConverter.buildMutateRequest(
location.getRegionInfo().getRegionName(), row, family,
qualifier, amount, writeToWAL);
+ MutateResponse response = server.mutate(null, request);
+ Result result = ProtobufUtil.toResult(response.getResult());
+ return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -907,8 +966,15 @@ public class HTable implements HTableInt
throws IOException {
return new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
public Boolean call() throws IOException {
- return server.checkAndPut(location.getRegionInfo().getRegionName(),
- row, family, qualifier, value, put) ? Boolean.TRUE : Boolean.FALSE;
+ try {
+ MutateRequest request = RequestConverter.buildMutateRequest(
+ location.getRegionInfo().getRegionName(), row, family, qualifier,
+ new BinaryComparator(value), CompareType.EQUAL, put);
+ MutateResponse response = server.mutate(null, request);
+ return Boolean.valueOf(response.getProcessed());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -924,10 +990,15 @@ public class HTable implements HTableInt
throws IOException {
return new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
public Boolean call() throws IOException {
- return server.checkAndDelete(
- location.getRegionInfo().getRegionName(),
- row, family, qualifier, value, delete)
- ? Boolean.TRUE : Boolean.FALSE;
+ try {
+ MutateRequest request = RequestConverter.buildMutateRequest(
+ location.getRegionInfo().getRegionName(), row, family, qualifier,
+ new BinaryComparator(value), CompareType.EQUAL, delete);
+ MutateResponse response = server.mutate(null, request);
+ return Boolean.valueOf(response.getProcessed());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -939,8 +1010,14 @@ public class HTable implements HTableInt
public boolean exists(final Get get) throws IOException {
return new ServerCallable<Boolean>(connection, tableName, get.getRow(), operationTimeout) {
public Boolean call() throws IOException {
- return server.
- exists(location.getRegionInfo().getRegionName(), get);
+ try {
+ GetRequest request = RequestConverter.buildGetRequest(
+ location.getRegionInfo().getRegionName(), get, true);
+ GetResponse response = server.get(null, request);
+ return response.getExists();
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -1026,9 +1103,14 @@ public class HTable implements HTableInt
throws IOException {
return new ServerCallable<RowLock>(connection, tableName, row, operationTimeout) {
public RowLock call() throws IOException {
- long lockId =
- server.lockRow(location.getRegionInfo().getRegionName(), row);
- return new RowLock(row,lockId);
+ try {
+ LockRowRequest request = RequestConverter.buildLockRowRequest(
+ location.getRegionInfo().getRegionName(), row);
+ LockRowResponse response = server.lockRow(null, request);
+ return new RowLock(row, response.getLockId());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
}.withRetries();
}
@@ -1039,14 +1121,18 @@ public class HTable implements HTableInt
@Override
public void unlockRow(final RowLock rl)
throws IOException {
- new ServerCallable<Void>(connection, tableName, rl.getRow(),
- operationTimeout) {
- public Void call() throws IOException {
- server.unlockRow(location.getRegionInfo().getRegionName(), rl
- .getLockId());
- return null;
- }
- }.withRetries();
+ new ServerCallable<Boolean>(connection, tableName, rl.getRow(), operationTimeout) {
+ public Boolean call() throws IOException {
+ try {
+ UnlockRowRequest request = RequestConverter.buildUnlockRowRequest(
+ location.getRegionInfo().getRegionName(), rl.getLockId());
+ server.unlockRow(null, request);
+ return Boolean.TRUE;
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+ }.withRetries();
}
/**
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Fri Apr 13 20:28:21 2012
@@ -25,15 +25,22 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.DNS;
+import com.google.protobuf.ServiceException;
+
/**
* Retries scanner operations such as create, next, etc.
* Used by {@link ResultScanner}s made by {@link HTable}.
@@ -107,41 +114,58 @@ public class ScannerCallable extends Ser
* @see java.util.concurrent.Callable#call()
*/
public Result [] call() throws IOException {
- if (scannerId != -1L && closed) {
- close();
- } else if (scannerId == -1L && !closed) {
- this.scannerId = openScanner();
+ if (closed) {
+ if (scannerId != -1) {
+ close();
+ }
} else {
- Result [] rrs = null;
- try {
- incRPCcallsMetrics();
- rrs = server.next(scannerId, caching);
- updateResultsMetrics(rrs);
- } catch (IOException e) {
- IOException ioe = null;
- if (e instanceof RemoteException) {
- ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
- }
- if (ioe == null) throw new IOException(e);
- if (ioe instanceof NotServingRegionException) {
- // Throw a DNRE so that we break out of cycle of calling NSRE
- // when what we need is to open scanner against new location.
- // Attach NSRE to signal client that it needs to resetup scanner.
- if (this.scanMetrics != null) {
- this.scanMetrics.countOfNSRE.inc();
+ if (scannerId == -1L) {
+ this.scannerId = openScanner();
+ } else {
+ Result [] rrs = null;
+ try {
+ incRPCcallsMetrics();
+ ScanRequest request =
+ RequestConverter.buildScanRequest(scannerId, caching, false);
+ try {
+ ScanResponse response = server.scan(null, request);
+ rrs = ResponseConverter.getResults(response);
+ if (response.hasMoreResults()
+ && !response.getMoreResults()) {
+ scannerId = -1L;
+ closed = true;
+ return null;
+ }
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ updateResultsMetrics(rrs);
+ } catch (IOException e) {
+ IOException ioe = null;
+ if (e instanceof RemoteException) {
+ ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
+ }
+ if (ioe == null) throw new IOException(e);
+ if (ioe instanceof NotServingRegionException) {
+ // Throw a DNRE so that we break out of cycle of calling NSRE
+ // when what we need is to open scanner against new location.
+ // Attach NSRE to signal client that it needs to resetup scanner.
+ if (this.scanMetrics != null) {
+ this.scanMetrics.countOfNSRE.inc();
+ }
+ throw new DoNotRetryIOException("Reset scanner", ioe);
+ } else if (ioe instanceof RegionServerStoppedException) {
+ // Throw a DNRE so that we break out of cycle of calling RSSE
+ // when what we need is to open scanner against new location.
+ // Attach RSSE to signal client that it needs to resetup scanner.
+ throw new DoNotRetryIOException("Reset scanner", ioe);
+ } else {
+ // The outer layers will retry
+ throw ioe;
}
- throw new DoNotRetryIOException("Reset scanner", ioe);
- } else if (ioe instanceof RegionServerStoppedException) {
- // Throw a DNRE so that we break out of cycle of calling RSSE
- // when what we need is to open scanner against new location.
- // Attach RSSE to signal client that it needs to resetup scanner.
- throw new DoNotRetryIOException("Reset scanner", ioe);
- } else {
- // The outer layers will retry
- throw ioe;
}
+ return rrs;
}
- return rrs;
}
return null;
}
@@ -161,10 +185,12 @@ public class ScannerCallable extends Ser
return;
}
for (Result rr : rrs) {
- this.scanMetrics.countOfBytesInResults.inc(rr.getBytes().getLength());
- if (isRegionServerRemote) {
- this.scanMetrics.countOfBytesInRemoteResults.inc(
- rr.getBytes().getLength());
+ if (rr.getBytes() != null) {
+ this.scanMetrics.countOfBytesInResults.inc(rr.getBytes().getLength());
+ if (isRegionServerRemote) {
+ this.scanMetrics.countOfBytesInRemoteResults.inc(
+ rr.getBytes().getLength());
+ }
}
}
}
@@ -175,7 +201,13 @@ public class ScannerCallable extends Ser
}
try {
incRPCcallsMetrics();
- this.server.close(this.scannerId);
+ ScanRequest request =
+ RequestConverter.buildScanRequest(this.scannerId, 0, true);
+ try {
+ server.scan(null, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
} catch (IOException e) {
LOG.warn("Ignore, probably already closed", e);
}
@@ -184,8 +216,16 @@ public class ScannerCallable extends Ser
protected long openScanner() throws IOException {
incRPCcallsMetrics();
- return this.server.openScanner(this.location.getRegionInfo().getRegionName(),
- this.scan);
+ ScanRequest request =
+ RequestConverter.buildScanRequest(
+ this.location.getRegionInfo().getRegionName(),
+ this.scan, 0, false);
+ try {
+ ScanResponse response = server.scan(null, request);
+ return response.getScannerId();
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
protected Scan getScan() {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Fri Apr 13 20:28:21 2012
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.DoNotRetr
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.ipc.RemoteException;
@@ -57,7 +57,7 @@ public abstract class ServerCallable<T>
protected final byte [] tableName;
protected final byte [] row;
protected HRegionLocation location;
- protected HRegionInterface server;
+ protected ClientProtocol server;
protected int callTimeout;
protected long startTime, endTime;
@@ -84,8 +84,8 @@ public abstract class ServerCallable<T>
*/
public void connect(final boolean reload) throws IOException {
this.location = connection.getRegionLocation(tableName, row, reload);
- this.server = connection.getHRegionConnection(location.getHostname(),
- location.getPort());
+ this.server = connection.getClient(location.getHostname(),
+ location.getPort());
}
/** @return the server name
@@ -224,7 +224,7 @@ public abstract class ServerCallable<T>
}
}
- private static Throwable translateException(Throwable t) throws IOException {
+ protected static Throwable translateException(Throwable t) throws IOException {
if (t instanceof UndeclaredThrowableException) {
t = t.getCause();
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java Fri Apr 13 20:28:21 2012
@@ -19,13 +19,10 @@
*/
package org.apache.hadoop.hbase.filter;
+import java.nio.ByteBuffer;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.util.Bytes;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import org.apache.hadoop.hbase.filter.*;
/**
* ParseConstants holds a bunch of constants related to parsing Filter Strings
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Fri Apr 13 20:28:21 2012
@@ -100,6 +100,7 @@ import org.apache.hadoop.io.WritableFact
import org.apache.hadoop.io.WritableUtils;
import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
/**
* This is a customized version of the polymorphic hadoop
@@ -268,6 +269,8 @@ public class HbaseObjectWritable impleme
GENERIC_ARRAY_CODE = code++;
addToMap(Array.class, GENERIC_ARRAY_CODE);
+ addToMap(RpcController.class, code++);
+
// make sure that this is the last statement in this static block
NEXT_CLASS_CODE = code;
}
@@ -357,7 +360,7 @@ public class HbaseObjectWritable impleme
}
}
- static Integer getClassCode(final Class<?> c)
+ public static Integer getClassCode(final Class<?> c)
throws IOException {
Integer code = CLASS_TO_CODE.get(c);
if (code == null ) {
@@ -726,7 +729,7 @@ public class HbaseObjectWritable impleme
* @return the instantiated Message instance
* @throws IOException if an IO problem occurs
*/
- private static Message tryInstantiateProtobuf(
+ public static Message tryInstantiateProtobuf(
Class<?> protoClass,
DataInput dataIn) throws IOException {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java Fri Apr 13 20:28:21 2012
@@ -110,6 +110,14 @@ public class TimeRange implements Writab
}
/**
+ * Check if it is for all time
+ * @return true if it is for all time
+ */
+ public boolean isAllTime() {
+ return allTime;
+ }
+
+ /**
* Check if the specified timestamp is within this TimeRange.
* <p>
* Returns true if within interval [minStamp, maxStamp), false
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java Fri Apr 13 20:28:21 2012
@@ -19,18 +19,23 @@
*/
package org.apache.hadoop.hbase.ipc;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
import org.apache.hadoop.hbase.util.Bytes;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-
/**
* Backs a {@link CoprocessorProtocol} subclass proxy and forwards method
* invocations for server execution. Note that internally this will issue a
@@ -74,8 +79,13 @@ public class ExecRPCInvoker implements I
ServerCallable<ExecResult> callable =
new ServerCallable<ExecResult>(connection, table, row) {
public ExecResult call() throws Exception {
- return server.execCoprocessor(location.getRegionInfo().getRegionName(),
- exec);
+ byte[] regionName = location.getRegionInfo().getRegionName();
+ ExecCoprocessorRequest request =
+ RequestConverter.buildExecCoprocessorRequest(regionName, exec);
+ ExecCoprocessorResponse response =
+ server.execCoprocessor(null, request);
+ Object value = ProtobufUtil.toObject(response.getValue());
+ return new ExecResult(regionName, value);
}
};
ExecResult result = callable.withRetries();
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java Fri Apr 13 20:28:21 2012
@@ -19,19 +19,23 @@
*/
package org.apache.hadoop.hbase.ipc;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.io.VersionMismatchException;
import org.apache.hadoop.io.VersionedWritable;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-
/** A method invocation, including the method name and its parameters.*/
@InterfaceAudience.Private
public class Invocation extends VersionedWritable implements Configurable {
@@ -43,6 +47,17 @@ public class Invocation extends Versione
private long clientVersion;
private int clientMethodsHash;
+
+ // For generated protocol classes which don't have VERSION field,
+ // such as protobuf interfaces.
+ private static final Map<Class<?>, Long>
+ PROTOCOL_VERSION = new HashMap<Class<?>, Long>();
+
+ static {
+ PROTOCOL_VERSION.put(ClientService.BlockingInterface.class,
+ Long.valueOf(ClientProtocol.VERSION));
+ }
+
private static byte RPC_VERSION = 1;
public Invocation() {}
@@ -51,22 +66,28 @@ public class Invocation extends Versione
this.methodName = method.getName();
this.parameterClasses = method.getParameterTypes();
this.parameters = parameters;
- if (method.getDeclaringClass().equals(VersionedProtocol.class)) {
+ Class<?> declaringClass = method.getDeclaringClass();
+ if (declaringClass.equals(VersionedProtocol.class)) {
//VersionedProtocol is exempted from version check.
clientVersion = 0;
clientMethodsHash = 0;
} else {
try {
- Field versionField = method.getDeclaringClass().getField("VERSION");
- versionField.setAccessible(true);
- this.clientVersion = versionField.getLong(method.getDeclaringClass());
+ Long version = PROTOCOL_VERSION.get(declaringClass);
+ if (version != null) {
+ this.clientVersion = version.longValue();
+ } else {
+ Field versionField = declaringClass.getField("VERSION");
+ versionField.setAccessible(true);
+ this.clientVersion = versionField.getLong(declaringClass);
+ }
} catch (NoSuchFieldException ex) {
- throw new RuntimeException("The " + method.getDeclaringClass(), ex);
+ throw new RuntimeException("The " + declaringClass, ex);
} catch (IllegalAccessException ex) {
throw new RuntimeException(ex);
}
- this.clientMethodsHash = ProtocolSignature.getFingerprint(method
- .getDeclaringClass().getMethods());
+ this.clientMethodsHash = ProtocolSignature.getFingerprint(
+ declaringClass.getMethods());
}
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java Fri Apr 13 20:28:21 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Objects;
@@ -52,6 +53,8 @@ import org.apache.hadoop.conf.*;
import org.codehaus.jackson.map.ObjectMapper;
+import com.google.protobuf.ServiceException;
+
/** An RpcEngine implementation for Writable data. */
@InterfaceAudience.Private
class WritableRpcEngine implements RpcEngine {
@@ -407,6 +410,9 @@ class WritableRpcEngine implements RpcEn
if (target instanceof IOException) {
throw (IOException)target;
}
+ if (target instanceof ServiceException) {
+ throw ProtobufUtil.getRemoteException((ServiceException)target);
+ }
IOException ioe = new IOException(target.toString());
ioe.setStackTrace(target.getStackTrace());
throw ioe;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Fri Apr 13 20:28:21 2012
@@ -68,10 +68,13 @@ import org.apache.hadoop.hbase.io.Refere
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
-import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
@@ -486,7 +489,11 @@ public class LoadIncrementalHFiles exten
LOG.debug("Going to connect to server " + location + " for row "
+ Bytes.toStringBinary(row));
byte[] regionName = location.getRegionInfo().getRegionName();
- return server.bulkLoadHFiles(famPaths, regionName);
+ BulkLoadHFileRequest request =
+ RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName);
+ BulkLoadHFileResponse response =
+ server.bulkLoadHFile(null, request);
+ return response.getLoaded();
}
};
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/AdminProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/AdminProtocol.java?rev=1325937&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/AdminProtocol.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/AdminProtocol.java Fri Apr 13 20:28:21 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.protobuf;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.VersionedProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.security.TokenInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol that a HBase client uses to communicate with a region server.
+ */
+@KerberosInfo(
+ serverPrincipal = "hbase.regionserver.kerberos.principal")
+@TokenInfo("HBASE_AUTH_TOKEN")
+@InterfaceAudience.Private
+public interface AdminProtocol extends
+ AdminService.BlockingInterface, VersionedProtocol {
+ public static final long VERSION = 1L;
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ClientProtocol.java?rev=1325937&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ClientProtocol.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ClientProtocol.java Fri Apr 13 20:28:21 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.protobuf;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ipc.VersionedProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.security.TokenInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol that a HBase client uses to communicate with a region server.
+ */
+@KerberosInfo(
+ serverPrincipal = "hbase.regionserver.kerberos.principal")
+@TokenInfo("HBASE_AUTH_TOKEN")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ClientProtocol extends
+ ClientService.BlockingInterface, VersionedProtocol {
+ public static final long VERSION = 1L;
+}