You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/04/18 20:53:30 UTC

svn commit: r1327629 - in /hbase/trunk: security/src/main/java/org/apache/hadoop/hbase/ipc/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/ipc/

Author: stack
Date: Wed Apr 18 18:53:30 2012
New Revision: 1327629

URL: http://svn.apache.org/viewvc?rev=1327629&view=rev
Log:
HBASE-5810 HBASE-5620 Convert the client protocol of HRegionInterface to PB addendum

Modified:
    hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java

Modified: hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java?rev=1327629&r1=1327628&r2=1327629&view=diff
==============================================================================
--- hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java (original)
+++ hbase/trunk/security/src/main/java/org/apache/hadoop/hbase/ipc/SecureRpcEngine.java Wed Apr 18 18:53:30 2012
@@ -20,30 +20,25 @@ package org.apache.hadoop.hbase.ipc;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.security.HBasePolicyProvider;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
 import org.apache.hadoop.hbase.util.Objects;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
-import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 
+import com.google.protobuf.ServiceException;
+
 import javax.net.SocketFactory;
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.*;
-import java.net.ConnectException;
 import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -161,14 +156,26 @@ public class SecureRpcEngine implements 
       if (logDebug) {
         startTime = System.currentTimeMillis();
       }
-      HbaseObjectWritable value = (HbaseObjectWritable)
-        client.call(new Invocation(method, args), address,
-                    protocol, ticket, rpcTimeout);
-      if (logDebug) {
-        long callTime = System.currentTimeMillis() - startTime;
-        LOG.debug("Call: " + method.getName() + " " + callTime);
+      try {
+        HbaseObjectWritable value = (HbaseObjectWritable)
+          client.call(new Invocation(method, args), address,
+                      protocol, ticket, rpcTimeout);
+        if (logDebug) {
+          long callTime = System.currentTimeMillis() - startTime;
+          LOG.debug("Call: " + method.getName() + " " + callTime);
+        }
+        return value.get();
+      } catch (Throwable t) {
+        // For protobuf protocols, ServiceException is expected
+        if (Invocation.PROTOBUF_PROTOCOLS.contains(protocol)) {
+          if (t instanceof RemoteException) {
+            Throwable cause = ((RemoteException)t).unwrapRemoteException();
+            throw new ServiceException(cause);
+          }
+          throw new ServiceException(t);
+        }
+        throw t;
       }
-      return value.get();
     }
 
     /* close the IPC client that's responsible for this invoker's RPCs */
@@ -390,6 +397,9 @@ public class SecureRpcEngine implements 
         if (target instanceof IOException) {
           throw (IOException)target;
         }
+        if (target instanceof ServiceException) {
+          throw ProtobufUtil.getRemoteException((ServiceException)target);
+        }
         IOException ioe = new IOException(target.toString());
         ioe.setStackTrace(target.getStackTrace());
         throw ioe;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1327629&r1=1327628&r2=1327629&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Wed Apr 18 18:53:30 2012
@@ -141,11 +141,10 @@ public class ScannerCallable extends Ser
           }
           updateResultsMetrics(rrs);
         } catch (IOException e) {
-          IOException ioe = null;
+          IOException ioe = e;
           if (e instanceof RemoteException) {
             ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
           }
-          if (ioe == null) throw new IOException(e);
           if (ioe instanceof NotServingRegionException) {
             // Throw a DNRE so that we break out of cycle of calling NSRE
             // when what we need is to open scanner against new location.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1327629&r1=1327628&r2=1327629&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Wed Apr 18 18:53:30 2012
@@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.ipc.RemoteException;
 
+import com.google.protobuf.ServiceException;
+
 /**
  * Abstract class that implements {@link Callable}.  Implementation stipulates
  * return type and method we actually invoke on remote Server.  Usually
@@ -231,7 +233,13 @@ public abstract class ServerCallable<T> 
     if (t instanceof RemoteException) {
       t = ((RemoteException)t).unwrapRemoteException();
     }
-    if (t instanceof DoNotRetryIOException) {
+    if (t instanceof ServiceException) {
+      ServiceException se = (ServiceException)t;
+      Throwable cause = se.getCause();
+      if (cause != null && cause instanceof DoNotRetryIOException) {
+        throw (DoNotRetryIOException)cause;
+      }
+    } else if (t instanceof DoNotRetryIOException) {
       throw (DoNotRetryIOException)t;
     }
     return t;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java?rev=1327629&r1=1327628&r2=1327629&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java Wed Apr 18 18:53:30 2012
@@ -88,11 +88,11 @@ public class ExecRPCInvoker implements I
               return new ExecResult(regionName, value);
             }
           };
-      ExecResult result = callable.withRetries();
-      this.regionName = result.getRegionName();
-      LOG.debug("Result is region="+ Bytes.toStringBinary(regionName) +
-          ", value="+result.getValue());
-      return result.getValue();
+        ExecResult result = callable.withRetries();
+        this.regionName = result.getRegionName();
+        LOG.debug("Result is region="+ Bytes.toStringBinary(regionName) +
+            ", value="+result.getValue());
+        return result.getValue();
     }
 
     return null;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java?rev=1327629&r1=1327628&r2=1327629&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java Wed Apr 18 18:53:30 2012
@@ -31,9 +31,9 @@ import org.apache.hadoop.hbase.security.
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.ReflectionUtils;
+
 import javax.net.SocketFactory;
 import java.io.IOException;
-import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
@@ -138,7 +138,6 @@ public class HBaseRPC {
   /**
    * A version mismatch for the RPC protocol.
    */
-  @SuppressWarnings("serial")
   public static class VersionMismatch extends IOException {
     private static final long serialVersionUID = 0;
     private String interfaceName;
@@ -346,31 +345,6 @@ public class HBaseRPC {
   }
 
   /**
-   * Expert: Make multiple, parallel calls to a set of servers.
-   *
-   * @param method method to invoke
-   * @param params array of parameters
-   * @param addrs array of addresses
-   * @param conf configuration
-   * @return values
-   * @throws IOException e
-   * @deprecated Instead of calling statically, use
-   *     {@link HBaseRPC#getProtocolEngine(Class, org.apache.hadoop.conf.Configuration)}
-   *     to obtain an {@link RpcEngine} instance and then use
-   *     {@link RpcEngine#call(java.lang.reflect.Method, Object[][], java.net.InetSocketAddress[], Class, org.apache.hadoop.hbase.security.User, org.apache.hadoop.conf.Configuration)}
-   */
-  @Deprecated
-  public static Object[] call(Method method, Object[][] params,
-      InetSocketAddress[] addrs,
-      Class<? extends VersionedProtocol> protocol,
-      User ticket,
-      Configuration conf)
-    throws IOException, InterruptedException {
-    return getProtocolEngine(protocol, conf)
-      .call(method, params, addrs, protocol, ticket, conf);
-  }
-
-  /**
    * Construct a server for a protocol implementation instance listening on a
    * port and address.
    *

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java?rev=1327629&r1=1327628&r2=1327629&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java Wed Apr 18 18:53:30 2012
@@ -25,12 +25,15 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.protobuf.AdminProtocol;
 import org.apache.hadoop.hbase.protobuf.ClientProtocol;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.io.VersionMismatchException;
@@ -58,6 +61,15 @@ public class Invocation extends Versione
       Long.valueOf(ClientProtocol.VERSION));
   }
 
+  // For protobuf protocols, which use ServiceException, instead of IOException
+  protected static final Set<Class<?>>
+    PROTOBUF_PROTOCOLS = new HashSet<Class<?>>();
+
+  static {
+    PROTOBUF_PROTOCOLS.add(ClientProtocol.class);
+    PROTOBUF_PROTOCOLS.add(AdminProtocol.class);
+  }
+
   private static byte RPC_VERSION = 1;
 
   public Invocation() {}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java?rev=1327629&r1=1327628&r2=1327629&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/RpcEngine.java Wed Apr 18 18:53:30 2012
@@ -19,7 +19,6 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import java.lang.reflect.Method;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import javax.net.SocketFactory;
@@ -42,17 +41,10 @@ interface RpcEngine {
   /** Stop this proxy. */
   void stopProxy(VersionedProtocol proxy);
 
-  /** Expert: Make multiple, parallel calls to a set of servers. */
-  Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
-                Class<? extends VersionedProtocol> protocol,
-                User ticket, Configuration conf)
-    throws IOException, InterruptedException;
-
   /** Construct a server for a protocol implementation instance. */
   RpcServer getServer(Class<? extends VersionedProtocol> protocol, Object instance,
                        Class<?>[] ifaces, String bindAddress,
                        int port, int numHandlers, int metaHandlerCount,
                        boolean verbose, Configuration conf, int highPriorityLevel)
       throws IOException;
-
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java?rev=1327629&r1=1327628&r2=1327629&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java Wed Apr 18 18:53:30 2012
@@ -22,9 +22,9 @@ package org.apache.hadoop.hbase.ipc;
 
 import java.lang.reflect.Proxy;
 import java.lang.reflect.Method;
-import java.lang.reflect.Array;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.UndeclaredThrowableException;
 
 import java.net.InetSocketAddress;
 import java.io.*;
@@ -45,9 +45,9 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.Objects;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.hbase.ipc.VersionedProtocol;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.*;
 
@@ -59,7 +59,7 @@ import com.google.protobuf.ServiceExcept
 @InterfaceAudience.Private
 class WritableRpcEngine implements RpcEngine {
   // LOG is NOT in hbase subpackage intentionally so that the default HBase
-  // DEBUG log level does NOT emit RPC-level logging. 
+  // DEBUG log level does NOT emit RPC-level logging.
   private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine");
 
   /* Cache a client using its socket factory as the hash key */
@@ -96,17 +96,6 @@ class WritableRpcEngine implements RpcEn
     }
 
     /**
-     * Construct & cache an IPC client with the default SocketFactory
-     * if no cached client exists.
-     *
-     * @param conf Configuration
-     * @return an IPC client
-     */
-    protected synchronized HBaseClient getClient(Configuration conf) {
-      return getClient(conf, SocketFactory.getDefault());
-    }
-
-    /**
      * Stop a RPC client connection
      * A RPC client is closed only when its reference count becomes zero.
      * @param client client to stop
@@ -152,15 +141,27 @@ class WritableRpcEngine implements RpcEn
         startTime = System.currentTimeMillis();
       }
 
-      HbaseObjectWritable value = (HbaseObjectWritable)
-        client.call(new Invocation(method, args), address,
-                    protocol, ticket, rpcTimeout);
-      if (logDebug) {
-        // FIGURE HOW TO TURN THIS OFF!
-        long callTime = System.currentTimeMillis() - startTime;
-        LOG.debug("Call: " + method.getName() + " " + callTime);
+      try {
+        HbaseObjectWritable value = (HbaseObjectWritable)
+          client.call(new Invocation(method, args), address,
+                      protocol, ticket, rpcTimeout);
+        if (logDebug) {
+          // FIGURE HOW TO TURN THIS OFF!
+          long callTime = System.currentTimeMillis() - startTime;
+          LOG.debug("Call: " + method.getName() + " " + callTime);
+        }
+        return value.get();
+      } catch (Throwable t) {
+        // For protobuf protocols, ServiceException is expected
+        if (Invocation.PROTOBUF_PROTOCOLS.contains(protocol)) {
+          if (t instanceof RemoteException) {
+            Throwable cause = ((RemoteException)t).unwrapRemoteException();
+            throw new ServiceException(cause);
+          }
+          throw new ServiceException(t);
+        }
+        throw t;
       }
-      return value.get();
     }
 
     /* close the IPC client that's responsible for this invoker's RPCs */
@@ -185,11 +186,25 @@ class WritableRpcEngine implements RpcEn
               protocol.getClassLoader(), new Class[] { protocol },
               new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
     if (proxy instanceof VersionedProtocol) {
-      long serverVersion = ((VersionedProtocol)proxy)
-        .getProtocolVersion(protocol.getName(), clientVersion);
-      if (serverVersion != clientVersion) {
-        throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
-                                      serverVersion);
+      try {
+        long serverVersion = ((VersionedProtocol)proxy)
+          .getProtocolVersion(protocol.getName(), clientVersion);
+        if (serverVersion != clientVersion) {
+          throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
+                                        serverVersion);
+        }
+      } catch (Throwable t) {
+        if (t instanceof UndeclaredThrowableException) {
+          t = t.getCause();
+        }
+        if (t instanceof ServiceException) {
+          throw ProtobufUtil.getRemoteException((ServiceException)t);
+        }
+        if (!(t instanceof IOException)) {
+          LOG.error("Unexpected throwable object ", t);
+          throw new IOException(t);
+        }
+        throw (IOException)t;
       }
     }
     return proxy;
@@ -205,38 +220,6 @@ class WritableRpcEngine implements RpcEn
     }
   }
 
-
-  /** Expert: Make multiple, parallel calls to a set of servers. */
-  public Object[] call(Method method, Object[][] params,
-                       InetSocketAddress[] addrs,
-                       Class<? extends VersionedProtocol> protocol,
-                       User ticket, Configuration conf)
-    throws IOException, InterruptedException {
-
-    Invocation[] invocations = new Invocation[params.length];
-    for (int i = 0; i < params.length; i++)
-      invocations[i] = new Invocation(method, params[i]);
-    HBaseClient client = CLIENTS.getClient(conf);
-    try {
-    Writable[] wrappedValues =
-      client.call(invocations, addrs, protocol, ticket);
-
-    if (method.getReturnType() == Void.TYPE) {
-      return null;
-    }
-
-    Object[] values =
-      (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
-    for (int i = 0; i < values.length; i++)
-      if (wrappedValues[i] != null)
-        values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
-
-    return values;
-    } finally {
-      CLIENTS.stopClient(client);
-    }
-  }
-
   /** Construct a server for a protocol implementation instance listening on a
    * port and address. */
   public Server getServer(Class<? extends VersionedProtocol> protocol,