You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/04/18 20:53:30 UTC
svn commit: r1327629 - in /hbase/trunk:
security/src/main/java/org/apache/hadoop/hbase/ipc/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/ipc/
Author: stack
Date: Wed Apr 18 18:53:30 2012
New Revision: 1327629
URL: http://svn.apache.org/viewvc?rev=1327629&view=rev
Log:
HBASE-5810 HBASE-5620 Convert the client protocol of HRegionInterface to PB addendum
Modified:
hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
Modified: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java?rev=1327629&r1=1327628&r2=1327629&view=diff
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java (original)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java Wed Apr 18 18:53:30 2012
@@ -20,30 +20,25 @@ package org.apache.hadoop.hbase.ipc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
import org.apache.hadoop.hbase.util.Objects;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
-import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import com.google.protobuf.ServiceException;
+
import javax.net.SocketFactory;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.*;
-import java.net.ConnectException;
import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
@@ -161,14 +156,26 @@ public class SecureRpcEngine implements
if (logDebug) {
startTime = System.currentTimeMillis();
}
- HbaseObjectWritable value = (HbaseObjectWritable)
- client.call(new Invocation(method, args), address,
- protocol, ticket, rpcTimeout);
- if (logDebug) {
- long callTime = System.currentTimeMillis() - startTime;
- LOG.debug("Call: " + method.getName() + " " + callTime);
+ try {
+ HbaseObjectWritable value = (HbaseObjectWritable)
+ client.call(new Invocation(method, args), address,
+ protocol, ticket, rpcTimeout);
+ if (logDebug) {
+ long callTime = System.currentTimeMillis() - startTime;
+ LOG.debug("Call: " + method.getName() + " " + callTime);
+ }
+ return value.get();
+ } catch (Throwable t) {
+ // For protobuf protocols, ServiceException is expected
+ if (Invocation.PROTOBUF_PROTOCOLS.contains(protocol)) {
+ if (t instanceof RemoteException) {
+ Throwable cause = ((RemoteException)t).unwrapRemoteException();
+ throw new ServiceException(cause);
+ }
+ throw new ServiceException(t);
+ }
+ throw t;
}
- return value.get();
}
/* close the IPC client that's responsible for this invoker's RPCs */
@@ -390,6 +397,9 @@ public class SecureRpcEngine implements
if (target instanceof IOException) {
throw (IOException)target;
}
+ if (target instanceof ServiceException) {
+ throw ProtobufUtil.getRemoteException((ServiceException)target);
+ }
IOException ioe = new IOException(target.toString());
ioe.setStackTrace(target.getStackTrace());
throw ioe;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1327629&r1=1327628&r2=1327629&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Wed Apr 18 18:53:30 2012
@@ -141,11 +141,10 @@ public class ScannerCallable extends Ser
}
updateResultsMetrics(rrs);
} catch (IOException e) {
- IOException ioe = null;
+ IOException ioe = e;
if (e instanceof RemoteException) {
ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
}
- if (ioe == null) throw new IOException(e);
if (ioe instanceof NotServingRegionException) {
// Throw a DNRE so that we break out of cycle of calling NSRE
// when what we need is to open scanner against new location.
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1327629&r1=1327628&r2=1327629&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Wed Apr 18 18:53:30 2012
@@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.ipc.RemoteException;
+import com.google.protobuf.ServiceException;
+
/**
* Abstract class that implements {@link Callable}. Implementation stipulates
* return type and method we actually invoke on remote Server. Usually
@@ -231,7 +233,13 @@ public abstract class ServerCallable<T>
if (t instanceof RemoteException) {
t = ((RemoteException)t).unwrapRemoteException();
}
- if (t instanceof DoNotRetryIOException) {
+ if (t instanceof ServiceException) {
+ ServiceException se = (ServiceException)t;
+ Throwable cause = se.getCause();
+ if (cause != null && cause instanceof DoNotRetryIOException) {
+ throw (DoNotRetryIOException)cause;
+ }
+ } else if (t instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException)t;
}
return t;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java?rev=1327629&r1=1327628&r2=1327629&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java Wed Apr 18 18:53:30 2012
@@ -88,11 +88,11 @@ public class ExecRPCInvoker implements I
return new ExecResult(regionName, value);
}
};
- ExecResult result = callable.withRetries();
- this.regionName = result.getRegionName();
- LOG.debug("Result is region="+ Bytes.toStringBinary(regionName) +
- ", value="+result.getValue());
- return result.getValue();
+ ExecResult result = callable.withRetries();
+ this.regionName = result.getRegionName();
+ LOG.debug("Result is region="+ Bytes.toStringBinary(regionName) +
+ ", value="+result.getValue());
+ return result.getValue();
}
return null;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1327629&r1=1327628&r2=1327629&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Wed Apr 18 18:53:30 2012
@@ -31,9 +31,9 @@ import org.apache.hadoop.hbase.security.
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ReflectionUtils;
+
import javax.net.SocketFactory;
import java.io.IOException;
-import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetSocketAddress;
@@ -138,7 +138,6 @@ public class HBaseRPC {
/**
* A version mismatch for the RPC protocol.
*/
- @SuppressWarnings("serial")
public static class VersionMismatch extends IOException {
private static final long serialVersionUID = 0;
private String interfaceName;
@@ -346,31 +345,6 @@ public class HBaseRPC {
}
/**
- * Expert: Make multiple, parallel calls to a set of servers.
- *
- * @param method method to invoke
- * @param params array of parameters
- * @param addrs array of addresses
- * @param conf configuration
- * @return values
- * @throws IOException e
- * @deprecated Instead of calling statically, use
- * {@link HBaseRPC#getProtocolEngine(Class, org.apache.hadoop.conf.Configuration)}
- * to obtain an {@link RpcEngine} instance and then use
- * {@link RpcEngine#call(java.lang.reflect.Method, Object[][], java.net.InetSocketAddress[], Class, org.apache.hadoop.hbase.security.User, org.apache.hadoop.conf.Configuration)}
- */
- @Deprecated
- public static Object[] call(Method method, Object[][] params,
- InetSocketAddress[] addrs,
- Class<? extends VersionedProtocol> protocol,
- User ticket,
- Configuration conf)
- throws IOException, InterruptedException {
- return getProtocolEngine(protocol, conf)
- .call(method, params, addrs, protocol, ticket, conf);
- }
-
- /**
* Construct a server for a protocol implementation instance listening on a
* port and address.
*
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java?rev=1327629&r1=1327628&r2=1327629&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java Wed Apr 18 18:53:30 2012
@@ -25,12 +25,15 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.protobuf.AdminProtocol;
import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.io.VersionMismatchException;
@@ -58,6 +61,15 @@ public class Invocation extends Versione
Long.valueOf(ClientProtocol.VERSION));
}
+ // For protobuf protocols, which use ServiceException, instead of IOException
+ protected static final Set<Class<?>>
+ PROTOBUF_PROTOCOLS = new HashSet<Class<?>>();
+
+ static {
+ PROTOBUF_PROTOCOLS.add(ClientProtocol.class);
+ PROTOBUF_PROTOCOLS.add(AdminProtocol.class);
+ }
+
private static byte RPC_VERSION = 1;
public Invocation() {}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java?rev=1327629&r1=1327628&r2=1327629&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java Wed Apr 18 18:53:30 2012
@@ -19,7 +19,6 @@
*/
package org.apache.hadoop.hbase.ipc;
-import java.lang.reflect.Method;
import java.io.IOException;
import java.net.InetSocketAddress;
import javax.net.SocketFactory;
@@ -42,17 +41,10 @@ interface RpcEngine {
/** Stop this proxy. */
void stopProxy(VersionedProtocol proxy);
- /** Expert: Make multiple, parallel calls to a set of servers. */
- Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
- Class<? extends VersionedProtocol> protocol,
- User ticket, Configuration conf)
- throws IOException, InterruptedException;
-
/** Construct a server for a protocol implementation instance. */
RpcServer getServer(Class<? extends VersionedProtocol> protocol, Object instance,
Class<?>[] ifaces, String bindAddress,
int port, int numHandlers, int metaHandlerCount,
boolean verbose, Configuration conf, int highPriorityLevel)
throws IOException;
-
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java?rev=1327629&r1=1327628&r2=1327629&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java Wed Apr 18 18:53:30 2012
@@ -22,9 +22,9 @@ package org.apache.hadoop.hbase.ipc;
import java.lang.reflect.Proxy;
import java.lang.reflect.Method;
-import java.lang.reflect.Array;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.io.*;
@@ -45,9 +45,9 @@ import org.apache.hadoop.hbase.util.Byte
import org.apache.hadoop.hbase.util.Objects;
import org.apache.hadoop.io.*;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.*;
@@ -59,7 +59,7 @@ import com.google.protobuf.ServiceExcept
@InterfaceAudience.Private
class WritableRpcEngine implements RpcEngine {
// LOG is NOT in hbase subpackage intentionally so that the default HBase
- // DEBUG log level does NOT emit RPC-level logging.
+ // DEBUG log level does NOT emit RPC-level logging.
private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine");
/* Cache a client using its socket factory as the hash key */
@@ -96,17 +96,6 @@ class WritableRpcEngine implements RpcEn
}
/**
- * Construct & cache an IPC client with the default SocketFactory
- * if no cached client exists.
- *
- * @param conf Configuration
- * @return an IPC client
- */
- protected synchronized HBaseClient getClient(Configuration conf) {
- return getClient(conf, SocketFactory.getDefault());
- }
-
- /**
* Stop a RPC client connection
* A RPC client is closed only when its reference count becomes zero.
* @param client client to stop
@@ -152,15 +141,27 @@ class WritableRpcEngine implements RpcEn
startTime = System.currentTimeMillis();
}
- HbaseObjectWritable value = (HbaseObjectWritable)
- client.call(new Invocation(method, args), address,
- protocol, ticket, rpcTimeout);
- if (logDebug) {
- // FIGURE HOW TO TURN THIS OFF!
- long callTime = System.currentTimeMillis() - startTime;
- LOG.debug("Call: " + method.getName() + " " + callTime);
+ try {
+ HbaseObjectWritable value = (HbaseObjectWritable)
+ client.call(new Invocation(method, args), address,
+ protocol, ticket, rpcTimeout);
+ if (logDebug) {
+ // FIGURE HOW TO TURN THIS OFF!
+ long callTime = System.currentTimeMillis() - startTime;
+ LOG.debug("Call: " + method.getName() + " " + callTime);
+ }
+ return value.get();
+ } catch (Throwable t) {
+ // For protobuf protocols, ServiceException is expected
+ if (Invocation.PROTOBUF_PROTOCOLS.contains(protocol)) {
+ if (t instanceof RemoteException) {
+ Throwable cause = ((RemoteException)t).unwrapRemoteException();
+ throw new ServiceException(cause);
+ }
+ throw new ServiceException(t);
+ }
+ throw t;
}
- return value.get();
}
/* close the IPC client that's responsible for this invoker's RPCs */
@@ -185,11 +186,25 @@ class WritableRpcEngine implements RpcEn
protocol.getClassLoader(), new Class[] { protocol },
new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
if (proxy instanceof VersionedProtocol) {
- long serverVersion = ((VersionedProtocol)proxy)
- .getProtocolVersion(protocol.getName(), clientVersion);
- if (serverVersion != clientVersion) {
- throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
- serverVersion);
+ try {
+ long serverVersion = ((VersionedProtocol)proxy)
+ .getProtocolVersion(protocol.getName(), clientVersion);
+ if (serverVersion != clientVersion) {
+ throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
+ serverVersion);
+ }
+ } catch (Throwable t) {
+ if (t instanceof UndeclaredThrowableException) {
+ t = t.getCause();
+ }
+ if (t instanceof ServiceException) {
+ throw ProtobufUtil.getRemoteException((ServiceException)t);
+ }
+ if (!(t instanceof IOException)) {
+ LOG.error("Unexpected throwable object ", t);
+ throw new IOException(t);
+ }
+ throw (IOException)t;
}
}
return proxy;
@@ -205,38 +220,6 @@ class WritableRpcEngine implements RpcEn
}
}
-
- /** Expert: Make multiple, parallel calls to a set of servers. */
- public Object[] call(Method method, Object[][] params,
- InetSocketAddress[] addrs,
- Class<? extends VersionedProtocol> protocol,
- User ticket, Configuration conf)
- throws IOException, InterruptedException {
-
- Invocation[] invocations = new Invocation[params.length];
- for (int i = 0; i < params.length; i++)
- invocations[i] = new Invocation(method, params[i]);
- HBaseClient client = CLIENTS.getClient(conf);
- try {
- Writable[] wrappedValues =
- client.call(invocations, addrs, protocol, ticket);
-
- if (method.getReturnType() == Void.TYPE) {
- return null;
- }
-
- Object[] values =
- (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
- for (int i = 0; i < values.length; i++)
- if (wrappedValues[i] != null)
- values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
-
- return values;
- } finally {
- CLIENTS.stopClient(client);
- }
- }
-
/** Construct a server for a protocol implementation instance listening on a
* port and address. */
public Server getServer(Class<? extends VersionedProtocol> protocol,