You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/04/13 22:28:22 UTC

svn commit: r1325937 [1/7] - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/catalog/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/filter/ main/java/org/apache/hadoop/hbase/io/ main/java/org/apache/hadoop/hbase/ipc...

Author: stack
Date: Fri Apr 13 20:28:21 2012
New Revision: 1325937

URL: http://svn.apache.org/viewvc?rev=1325937&view=rev
Log:
HBASE-5443 Convert the client protocol of HRegionInterface to PB

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/AdminProtocol.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ClientProtocol.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServer.java
    hbase/trunk/src/main/protobuf/Admin.proto
    hbase/trunk/src/main/protobuf/Client.proto
Removed:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RegionAdminProtos.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/RegionClientProtos.java
    hbase/trunk/src/main/protobuf/RegionAdmin.proto
    hbase/trunk/src/main/protobuf/RegionClient.proto
Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HBaseProtos.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java
    hbase/trunk/src/main/protobuf/hbase.proto
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaReaderEditorNoCluster.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/TestHbaseObjectWritable.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/OOMERegionServer.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java Fri Apr 13 20:28:21 2012
@@ -31,18 +31,15 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.ipc.RemoteException;
 
 /**
  * Reads region and assignment information from <code>.META.</code>.
@@ -253,19 +250,6 @@ public class MetaReader {
   }
 
   /**
-   * Reads the location of META from ROOT.
-   * @param metaServer connection to server hosting ROOT
-   * @return location of META in ROOT where location, or null if not available
-   * @throws IOException
-   * @deprecated Does not retry; use #getMetaRegionLocation(CatalogTracker)
-   */
-  public static ServerName readMetaLocation(HRegionInterface metaServer)
-  throws IOException {
-    return readLocation(metaServer, CatalogTracker.ROOT_REGION_NAME,
-        CatalogTracker.META_REGION_NAME);
-  }
-
-  /**
    * Gets the location of <code>.META.</code> region by reading content of
    * <code>-ROOT-</code>.
    * @param ct
@@ -292,50 +276,6 @@ public class MetaReader {
     return (pair == null || pair.getSecond() == null)? null: pair.getSecond();
   }
 
-  // TODO: Remove when deprecated dependencies are removed.
-  private static ServerName readLocation(HRegionInterface metaServer,
-      byte [] catalogRegionName, byte [] regionName)
-  throws IOException {
-    Result r = null;
-    try {
-      r = metaServer.get(catalogRegionName,
-        new Get(regionName).
-        addColumn(HConstants.CATALOG_FAMILY,
-          HConstants.SERVER_QUALIFIER).
-        addColumn(HConstants.CATALOG_FAMILY,
-          HConstants.STARTCODE_QUALIFIER));
-    } catch (java.net.SocketTimeoutException e) {
-      // Treat this exception + message as unavailable catalog table. Catch it
-      // and fall through to return a null
-    } catch (java.net.SocketException e) {
-      // Treat this exception + message as unavailable catalog table. Catch it
-      // and fall through to return a null
-    } catch (RemoteException re) {
-      IOException ioe = re.unwrapRemoteException();
-      if (ioe instanceof NotServingRegionException) {
-        // Treat this NSRE as unavailable table.  Catch and fall through to
-        // return null below
-      } else if (ioe.getMessage().contains("Server not running")) {
-        // Treat as unavailable table.
-      } else {
-        throw re;
-      }
-    } catch (IOException e) {
-      if (e.getCause() != null && e.getCause() instanceof IOException &&
-          e.getCause().getMessage() != null &&
-          e.getCause().getMessage().contains("Connection reset by peer")) {
-        // Treat this exception + message as unavailable catalog table. Catch it
-        // and fall through to return a null
-      } else {
-        throw e;
-      }
-    }
-    if (r == null || r.isEmpty()) {
-      return null;
-    }
-    return getServerNameFromCatalogResult(r);
-  }
-
   /**
    * Gets the region info and assignment for the specified region.
    * @param catalogTracker
@@ -655,35 +595,6 @@ public class MetaReader {
   }
 
   /**
-   * Fully scan a given region, on a given server starting with given row.
-   * @param hRegionInterface region server
-   * @param visitor visitor
-   * @param regionName name of region
-   * @param startrow start row
-   * @throws IOException
-   * @deprecated Does not retry; use fullScan xxx instead.
-   x
-   */
-  public static void fullScan(HRegionInterface hRegionInterface,
-                              Visitor visitor, final byte[] regionName,
-                              byte[] startrow) throws IOException {
-    if (hRegionInterface == null) return;
-    Scan scan = new Scan();
-    if (startrow != null) scan.setStartRow(startrow);
-    scan.addFamily(HConstants.CATALOG_FAMILY);
-    long scannerid = hRegionInterface.openScanner(regionName, scan);
-    try {
-      Result data;
-      while((data = hRegionInterface.next(scannerid)) != null) {
-        if (!data.isEmpty()) visitor.visit(data);
-      }
-    } finally {
-      hRegionInterface.close(scannerid);
-    }
-    return;
-  }
-
-  /**
    * Performs a full scan of a catalog table.
    * @param catalogTracker
    * @param visitor Visitor invoked against each row.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java Fri Apr 13 20:28:21 2012
@@ -307,6 +307,7 @@ public class ClientScanner extends Abstr
             }
             // Clear region
             this.currentRegion = null;
+            callable = null;
             continue;
           }
           long currentTime = System.currentTimeMillis();

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Fri Apr 13 20:28:21 2012
@@ -19,6 +19,17 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.SocketTimeoutException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Pattern;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -45,6 +56,12 @@ import org.apache.hadoop.hbase.catalog.M
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.ipc.HMasterInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -54,16 +71,7 @@ import org.apache.hadoop.ipc.RemoteExcep
 import org.apache.hadoop.util.StringUtils;
 import org.apache.zookeeper.KeeperException;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.SocketTimeoutException;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.regex.Pattern;
+import com.google.protobuf.ServiceException;
 
 /**
  * Provides an interface to manage HBase database table metadata + general
@@ -497,23 +505,27 @@ public class HBaseAdmin implements Abort
     });
 
     // Wait until all regions deleted
-    HRegionInterface server =
-      connection.getHRegionConnection(firstMetaServer.getHostname(), firstMetaServer.getPort());
+    ClientProtocol server =
+      connection.getClient(firstMetaServer.getHostname(), firstMetaServer.getPort());
     for (int tries = 0; tries < (this.numRetries * this.retryLongerMultiplier); tries++) {
-      long scannerId = -1L;
       try {
 
         Scan scan = MetaReader.getScanForTableName(tableName);
-        scan.addColumn(HConstants.CATALOG_FAMILY,
-            HConstants.REGIONINFO_QUALIFIER);
-        scannerId = server.openScanner(
-          firstMetaServer.getRegionInfo().getRegionName(), scan);
+        scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
+        ScanRequest request = RequestConverter.buildScanRequest(
+          firstMetaServer.getRegionInfo().getRegionName(), scan, 1, true);
+        Result[] values = null;
         // Get a batch at a time.
-        Result values = server.next(scannerId);
+        try {
+          ScanResponse response = server.scan(null, request);
+          values = ResponseConverter.getResults(response);
+        } catch (ServiceException se) {
+          throw ProtobufUtil.getRemoteException(se);
+        }
 
         // let us wait until .META. table is updated and
         // HMaster removes the table from its HTableDescriptors
-        if (values == null) {
+        if (values == null || values.length == 0) {
           boolean tableExists = false;
           HTableDescriptor[] htds;
           MasterKeepAliveConnection master = connection.getKeepAliveMaster();
@@ -542,14 +554,6 @@ public class HBaseAdmin implements Abort
             throw ex;
           }
         }
-      } finally {
-        if (scannerId != -1L) {
-          try {
-            server.close(scannerId);
-          } catch (IOException ex) {
-            LOG.warn(ex);
-          }
-        }
       }
       try {
         Thread.sleep(getPauseTime(tries));

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Fri Apr 13 20:28:21 2012
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.co
 import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
 import org.apache.hadoop.hbase.ipc.HMasterInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 
 /**
@@ -221,6 +222,19 @@ public interface HConnection extends Abo
   throws IOException;
 
   /**
+   * Establishes a connection to the region server at the specified address, and return
+   * a region client protocol.
+   *
+   * @param hostname RegionServer hostname
+   * @param port RegionServer port
+   * @return ClientProtocol proxy for RegionServer
+   * @throws IOException if a remote or network exception occurs
+   *
+   */
+  public ClientProtocol getClient(final String hostname, final int port)
+  throws IOException;
+
+  /**
    * Establishes a connection to the region server at the specified address.
    * @param regionServer - the server to connect to
    * @param getMaster - do we check if master is alive

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Fri Apr 13 20:28:21 2012
@@ -19,6 +19,33 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -46,6 +73,13 @@ import org.apache.hadoop.hbase.ipc.ExecR
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HMasterInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.VersionedProtocol;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -61,32 +95,7 @@ import org.apache.hadoop.hbase.zookeeper
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.zookeeper.KeeperException;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.google.protobuf.ServiceException;
 
 /**
  * A non-instantiable class that manages {@link HConnection}s.
@@ -146,6 +155,12 @@ public class HConnectionManager {
 
   public static final int MAX_CACHED_HBASE_INSTANCES;
 
+  /** Parameter name for what client protocol to use. */
+  public static final String CLIENT_PROTOCOL_CLASS = "hbase.clientprotocol.class";
+
+  /** Default client protocol class name. */
+  public static final String DEFAULT_CLIENT_PROTOCOL_CLASS = ClientProtocol.class.getName();
+
   private static final Log LOG = LogFactory.getLog(HConnectionManager.class);
 
   static {
@@ -493,6 +508,7 @@ public class HConnectionManager {
   static class HConnectionImplementation implements HConnection, Closeable {
     static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
     private final Class<? extends HRegionInterface> serverInterfaceClass;
+    private final Class<? extends ClientProtocol> clientClass;
     private final long pause;
     private final int numRetries;
     private final int maxRPCAttempts;
@@ -521,8 +537,8 @@ public class HConnectionManager {
     private final Configuration conf;
     // Known region HServerAddress.toString() -> HRegionInterface
 
-    private final Map<String, HRegionInterface> servers =
-      new ConcurrentHashMap<String, HRegionInterface>();
+    private final ConcurrentHashMap<String, Map<String, VersionedProtocol>> servers =
+      new ConcurrentHashMap<String, Map<String, VersionedProtocol>>();
     private final ConcurrentHashMap<String, String> connectionLock =
       new ConcurrentHashMap<String, String>();
 
@@ -570,6 +586,15 @@ public class HConnectionManager {
         throw new UnsupportedOperationException(
             "Unable to find region server interface " + serverClassName, e);
       }
+      String clientClassName = conf.get(CLIENT_PROTOCOL_CLASS,
+        DEFAULT_CLIENT_PROTOCOL_CLASS);
+      try {
+        this.clientClass =
+          (Class<? extends ClientProtocol>) Class.forName(clientClassName);
+      } catch (ClassNotFoundException e) {
+        throw new UnsupportedOperationException(
+            "Unable to find client protocol " + clientClassName, e);
+      }
       this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
           HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
       this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
@@ -1329,18 +1354,28 @@ public class HConnectionManager {
     }
 
     @Override
+    public ClientProtocol getClient(
+        final String hostname, final int port) throws IOException {
+      return (ClientProtocol)getProtocol(hostname, port,
+        clientClass, ClientProtocol.VERSION);
+    }
+
+    @Override
     @Deprecated
     public HRegionInterface getHRegionConnection(HServerAddress hsa,
         boolean master)
     throws IOException {
-      return getHRegionConnection(null, -1, hsa.getInetSocketAddress(), master);
+      String hostname = hsa.getInetSocketAddress().getHostName();
+      int port = hsa.getInetSocketAddress().getPort();
+      return getHRegionConnection(hostname, port, master);
     }
 
     @Override
     public HRegionInterface getHRegionConnection(final String hostname,
         final int port, final boolean master)
     throws IOException {
-      return getHRegionConnection(hostname, port, null, master);
+      return (HRegionInterface)getProtocol(hostname, port,
+        serverInterfaceClass, HRegionInterface.VERSION);
     }
 
     /**
@@ -1348,43 +1383,44 @@ public class HConnectionManager {
      * can be but not both.
      * @param hostname
      * @param port
-     * @param isa
-     * @param master
+     * @param protocolClass
+     * @param version
      * @return Proxy.
      * @throws IOException
      */
-    HRegionInterface getHRegionConnection(final String hostname, final int port,
-        final InetSocketAddress isa, final boolean master)
-    throws IOException {
-      HRegionInterface server;
-      String rsName = null;
-      if (isa != null) {
-        rsName = Addressing.createHostAndPortStr(isa.getHostName(),
-            isa.getPort());
-      } else {
-        rsName = Addressing.createHostAndPortStr(hostname, port);
-      }
+    VersionedProtocol getProtocol(final String hostname,
+        final int port, final Class <? extends VersionedProtocol> protocolClass,
+        final long version) throws IOException {
+      String rsName = Addressing.createHostAndPortStr(hostname, port);
       // See if we already have a connection (common case)
-      server = this.servers.get(rsName);
+      Map<String, VersionedProtocol> protocols = this.servers.get(rsName);
+      if (protocols == null) {
+        protocols = new HashMap<String, VersionedProtocol>();
+        Map<String, VersionedProtocol> existingProtocols =
+          this.servers.putIfAbsent(rsName, protocols);
+        if (existingProtocols != null) {
+          protocols = existingProtocols;
+        }
+      }
+      String protocol = protocolClass.getName();
+      VersionedProtocol server = protocols.get(protocol);
       if (server == null) {
-        // create a unique lock for this RS (if necessary)
-        this.connectionLock.putIfAbsent(rsName, rsName);
+        // create a unique lock for this RS + protocol (if necessary)
+        String lockKey = protocol + "@" + rsName;
+        this.connectionLock.putIfAbsent(lockKey, lockKey);
         // get the RS lock
-        synchronized (this.connectionLock.get(rsName)) {
+        synchronized (this.connectionLock.get(lockKey)) {
           // do one more lookup in case we were stalled above
-          server = this.servers.get(rsName);
+          server = protocols.get(protocol);
           if (server == null) {
             try {
               // Only create isa when we need to.
-              InetSocketAddress address = isa != null? isa:
-                new InetSocketAddress(hostname, port);
+              InetSocketAddress address = new InetSocketAddress(hostname, port);
               // definitely a cache miss. establish an RPC for this RS
-              server = (HRegionInterface) HBaseRPC.waitForProxy(
-                  serverInterfaceClass, HRegionInterface.VERSION,
-                  address, this.conf,
+              server = HBaseRPC.waitForProxy(
+                  protocolClass, version, address, this.conf,
                   this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
-              this.servers.put(Addressing.createHostAndPortStr(
-                  address.getHostName(), address.getPort()), server);
+              protocols.put(protocol, server);
             } catch (RemoteException e) {
               LOG.warn("RemoteException connecting to RS", e);
               // Throw what the RemoteException was carrying.
@@ -1679,11 +1715,42 @@ public class HConnectionManager {
          ServerCallable<MultiResponse> callable =
            new ServerCallable<MultiResponse>(connection, tableName, null) {
              public MultiResponse call() throws IOException {
-               return server.multi(multi);
+               try {
+                 MultiResponse response = new MultiResponse();
+                 for (Map.Entry<byte[], List<Action<R>>> e: multi.actions.entrySet()) {
+                   byte[] regionName = e.getKey();
+                   int rowMutations = 0;
+                   List<Action<R>> actions = e.getValue();
+                   for (Action<R> action: actions) {
+                     Row row = action.getAction();
+                     if (row instanceof RowMutations) {
+                       MultiRequest request =
+                         RequestConverter.buildMultiRequest(regionName, (RowMutations)row);
+                       server.multi(null, request);
+                       response.add(regionName, action.getOriginalIndex(), new Result());
+                       rowMutations++;
+                     }
+                   }
+                   if (actions.size() > rowMutations) {
+                     MultiRequest request =
+                       RequestConverter.buildMultiRequest(regionName, actions);
+                     ClientProtos.MultiResponse
+                       proto = server.multi(null, request);
+                     List<Object> results = ResponseConverter.getResults(proto);
+                     for (int i = 0, n = results.size(); i < n; i++) {
+                       int originalIndex = actions.get(i).getOriginalIndex();
+                       response.add(regionName, originalIndex, results.get(i));
+                     }
+                   }
+                 }
+                 return response;
+               } catch (ServiceException se) {
+                 throw ProtobufUtil.getRemoteException(se);
+               }
              }
              @Override
              public void connect(boolean reload) throws IOException {
-               server = connection.getHRegionConnection(
+               server = connection.getClient(
                  loc.getHostname(), loc.getPort());
              }
            };
@@ -1885,7 +1952,7 @@ public class HConnectionManager {
               }
             }
           } catch (ExecutionException e) {
-            LOG.warn("Failed all from " + loc, e);
+            LOG.debug("Failed all from " + loc, e);
           }
         }
 
@@ -2077,8 +2144,10 @@ public class HConnectionManager {
       delayedClosing.stop("Closing connection");
       if (stopProxy) {
         closeMaster();
-        for (HRegionInterface i : servers.values()) {
-          HBaseRPC.stopProxy(i);
+        for (Map<String, VersionedProtocol> i : servers.values()) {
+          for (VersionedProtocol server: i.values()) {
+            HBaseRPC.stopProxy(server);
+          }
         }
       }
       closeZooKeeperWatcher();

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/HTable.java Fri Apr 13 20:28:21 2012
@@ -53,13 +53,27 @@ import org.apache.hadoop.hbase.ServerNam
 import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
 import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition.CompareType;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.LockRowResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Writables;
 
+import com.google.protobuf.ServiceException;
+
 /**
  * <p>Used to communicate with a single HBase table.
  *
@@ -648,8 +662,15 @@ public class HTable implements HTableInt
    throws IOException {
      return new ServerCallable<Result>(connection, tableName, row, operationTimeout) {
        public Result call() throws IOException {
-         return server.getClosestRowBefore(location.getRegionInfo().getRegionName(),
-           row, family);
+         try {
+           GetRequest request = RequestConverter.buildGetRequest(
+               location.getRegionInfo().getRegionName(), row, family, true);
+           GetResponse response = server.get(null, request);
+           if (!response.hasResult()) return null;
+           return ProtobufUtil.toResult(response.getResult());
+         } catch (ServiceException se) {
+           throw ProtobufUtil.getRemoteException(se);
+         }
        }
      }.withRetries();
    }
@@ -694,7 +715,14 @@ public class HTable implements HTableInt
   public Result get(final Get get) throws IOException {
     return new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) {
           public Result call() throws IOException {
-            return server.get(location.getRegionInfo().getRegionName(), get);
+            try {
+              GetRequest request = RequestConverter.buildGetRequest(
+                  location.getRegionInfo().getRegionName(), get);
+              GetResponse response = server.get(null, request);
+              return ProtobufUtil.toResult(response.getResult());
+            } catch (ServiceException se) {
+              throw ProtobufUtil.getRemoteException(se);
+            }
           }
         }.withRetries();
   }
@@ -746,13 +774,18 @@ public class HTable implements HTableInt
   @Override
   public void delete(final Delete delete)
   throws IOException {
-    new ServerCallable<Void>(connection, tableName, delete.getRow(),
-        operationTimeout) {
-      public Void call() throws IOException {
-        server.delete(location.getRegionInfo().getRegionName(), delete);
-        return null;
-      }
-    }.withRetries();
+    new ServerCallable<Boolean>(connection, tableName, delete.getRow(), operationTimeout) {
+          public Boolean call() throws IOException {
+            try {
+              MutateRequest request = RequestConverter.buildMutateRequest(
+                location.getRegionInfo().getRegionName(), delete);
+              MutateResponse response = server.mutate(null, request);
+              return Boolean.valueOf(response.getProcessed());
+            } catch (ServiceException se) {
+              throw ProtobufUtil.getRemoteException(se);
+            }
+          }
+        }.withRetries();
   }
 
   /**
@@ -821,7 +854,13 @@ public class HTable implements HTableInt
     new ServerCallable<Void>(connection, tableName, rm.getRow(),
         operationTimeout) {
       public Void call() throws IOException {
-        server.mutateRow(location.getRegionInfo().getRegionName(), rm);
+        try {
+          MultiRequest request = RequestConverter.buildMultiRequest(
+            location.getRegionInfo().getRegionName(), rm);
+          server.multi(null, request);
+        } catch (ServiceException se) {
+          throw ProtobufUtil.getRemoteException(se);
+        }
         return null;
       }
     }.withRetries();
@@ -838,8 +877,15 @@ public class HTable implements HTableInt
     }
     return new ServerCallable<Result>(connection, tableName, append.getRow(), operationTimeout) {
           public Result call() throws IOException {
-            return server.append(
+            try {
+              MutateRequest request = RequestConverter.buildMutateRequest(
                 location.getRegionInfo().getRegionName(), append);
+              MutateResponse response = server.mutate(null, request);
+              if (!response.hasResult()) return null;
+              return ProtobufUtil.toResult(response.getResult());
+            } catch (ServiceException se) {
+              throw ProtobufUtil.getRemoteException(se);
+            }
           }
         }.withRetries();
   }
@@ -855,8 +901,14 @@ public class HTable implements HTableInt
     }
     return new ServerCallable<Result>(connection, tableName, increment.getRow(), operationTimeout) {
           public Result call() throws IOException {
-            return server.increment(
+            try {
+              MutateRequest request = RequestConverter.buildMutateRequest(
                 location.getRegionInfo().getRegionName(), increment);
+              MutateResponse response = server.mutate(null, request);
+              return ProtobufUtil.toResult(response.getResult());
+            } catch (ServiceException se) {
+              throw ProtobufUtil.getRemoteException(se);
+            }
           }
         }.withRetries();
   }
@@ -890,9 +942,16 @@ public class HTable implements HTableInt
     }
     return new ServerCallable<Long>(connection, tableName, row, operationTimeout) {
           public Long call() throws IOException {
-            return server.incrementColumnValue(
+            try {
+              MutateRequest request = RequestConverter.buildMutateRequest(
                 location.getRegionInfo().getRegionName(), row, family,
                 qualifier, amount, writeToWAL);
+              MutateResponse response = server.mutate(null, request);
+              Result result = ProtobufUtil.toResult(response.getResult());
+              return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
+            } catch (ServiceException se) {
+              throw ProtobufUtil.getRemoteException(se);
+            }
           }
         }.withRetries();
   }
@@ -907,8 +966,15 @@ public class HTable implements HTableInt
   throws IOException {
     return new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
           public Boolean call() throws IOException {
-            return server.checkAndPut(location.getRegionInfo().getRegionName(),
-                row, family, qualifier, value, put) ? Boolean.TRUE : Boolean.FALSE;
+            try {
+              MutateRequest request = RequestConverter.buildMutateRequest(
+                location.getRegionInfo().getRegionName(), row, family, qualifier,
+                new BinaryComparator(value), CompareType.EQUAL, put);
+              MutateResponse response = server.mutate(null, request);
+              return Boolean.valueOf(response.getProcessed());
+            } catch (ServiceException se) {
+              throw ProtobufUtil.getRemoteException(se);
+            }
           }
         }.withRetries();
   }
@@ -924,10 +990,15 @@ public class HTable implements HTableInt
   throws IOException {
     return new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
           public Boolean call() throws IOException {
-            return server.checkAndDelete(
-                location.getRegionInfo().getRegionName(),
-                row, family, qualifier, value, delete)
-            ? Boolean.TRUE : Boolean.FALSE;
+            try {
+              MutateRequest request = RequestConverter.buildMutateRequest(
+                location.getRegionInfo().getRegionName(), row, family, qualifier,
+                new BinaryComparator(value), CompareType.EQUAL, delete);
+              MutateResponse response = server.mutate(null, request);
+              return Boolean.valueOf(response.getProcessed());
+            } catch (ServiceException se) {
+              throw ProtobufUtil.getRemoteException(se);
+            }
           }
         }.withRetries();
   }
@@ -939,8 +1010,14 @@ public class HTable implements HTableInt
   public boolean exists(final Get get) throws IOException {
     return new ServerCallable<Boolean>(connection, tableName, get.getRow(), operationTimeout) {
           public Boolean call() throws IOException {
-            return server.
-                exists(location.getRegionInfo().getRegionName(), get);
+            try {
+              GetRequest request = RequestConverter.buildGetRequest(
+                  location.getRegionInfo().getRegionName(), get, true);
+              GetResponse response = server.get(null, request);
+              return response.getExists();
+            } catch (ServiceException se) {
+              throw ProtobufUtil.getRemoteException(se);
+            }
           }
         }.withRetries();
   }
@@ -1026,9 +1103,14 @@ public class HTable implements HTableInt
   throws IOException {
     return new ServerCallable<RowLock>(connection, tableName, row, operationTimeout) {
         public RowLock call() throws IOException {
-          long lockId =
-              server.lockRow(location.getRegionInfo().getRegionName(), row);
-          return new RowLock(row,lockId);
+          try {
+            LockRowRequest request = RequestConverter.buildLockRowRequest(
+              location.getRegionInfo().getRegionName(), row);
+            LockRowResponse response = server.lockRow(null, request);
+            return new RowLock(row, response.getLockId());
+          } catch (ServiceException se) {
+            throw ProtobufUtil.getRemoteException(se);
+          }
         }
       }.withRetries();
   }
@@ -1039,14 +1121,18 @@ public class HTable implements HTableInt
   @Override
   public void unlockRow(final RowLock rl)
   throws IOException {
-    new ServerCallable<Void>(connection, tableName, rl.getRow(),
-        operationTimeout) {
-      public Void call() throws IOException {
-        server.unlockRow(location.getRegionInfo().getRegionName(), rl
-            .getLockId());
-        return null;
-      }
-    }.withRetries();
+    new ServerCallable<Boolean>(connection, tableName, rl.getRow(), operationTimeout) {
+        public Boolean call() throws IOException {
+          try {
+            UnlockRowRequest request = RequestConverter.buildUnlockRowRequest(
+              location.getRegionInfo().getRegionName(), rl.getLockId());
+            server.unlockRow(null, request);
+            return Boolean.TRUE;
+          } catch (ServiceException se) {
+            throw ProtobufUtil.getRemoteException(se);
+          }
+        }
+      }.withRetries();
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Fri Apr 13 20:28:21 2012
@@ -25,15 +25,22 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
 
+import com.google.protobuf.ServiceException;
+
 /**
  * Retries scanner operations such as create, next, etc.
  * Used by {@link ResultScanner}s made by {@link HTable}.
@@ -107,41 +114,58 @@ public class ScannerCallable extends Ser
    * @see java.util.concurrent.Callable#call()
    */
   public Result [] call() throws IOException {
-    if (scannerId != -1L && closed) {
-      close();
-    } else if (scannerId == -1L && !closed) {
-      this.scannerId = openScanner();
+    if (closed) {
+      if (scannerId != -1) {
+        close();
+      }
     } else {
-      Result [] rrs = null;
-      try {
-        incRPCcallsMetrics();
-        rrs = server.next(scannerId, caching);
-        updateResultsMetrics(rrs);
-      } catch (IOException e) {
-        IOException ioe = null;
-        if (e instanceof RemoteException) {
-          ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
-        }
-        if (ioe == null) throw new IOException(e);
-        if (ioe instanceof NotServingRegionException) {
-          // Throw a DNRE so that we break out of cycle of calling NSRE
-          // when what we need is to open scanner against new location.
-          // Attach NSRE to signal client that it needs to resetup scanner.
-          if (this.scanMetrics != null) {
-            this.scanMetrics.countOfNSRE.inc();
+      if (scannerId == -1L) {
+        this.scannerId = openScanner();
+      } else {
+        Result [] rrs = null;
+        try {
+          incRPCcallsMetrics();
+          ScanRequest request =
+            RequestConverter.buildScanRequest(scannerId, caching, false);
+          try {
+            ScanResponse response = server.scan(null, request);
+            rrs = ResponseConverter.getResults(response);
+            if (response.hasMoreResults()
+                && !response.getMoreResults()) {
+              scannerId = -1L;
+              closed = true;
+              return null;
+            }
+          } catch (ServiceException se) {
+            throw ProtobufUtil.getRemoteException(se);
+          }
+          updateResultsMetrics(rrs);
+        } catch (IOException e) {
+          IOException ioe = null;
+          if (e instanceof RemoteException) {
+            ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
+          }
+          if (ioe == null) throw new IOException(e);
+          if (ioe instanceof NotServingRegionException) {
+            // Throw a DNRE so that we break out of cycle of calling NSRE
+            // when what we need is to open scanner against new location.
+            // Attach NSRE to signal client that it needs to resetup scanner.
+            if (this.scanMetrics != null) {
+              this.scanMetrics.countOfNSRE.inc();
+            }
+            throw new DoNotRetryIOException("Reset scanner", ioe);
+          } else if (ioe instanceof RegionServerStoppedException) {
+            // Throw a DNRE so that we break out of cycle of calling RSSE
+            // when what we need is to open scanner against new location.
+            // Attach RSSE to signal client that it needs to resetup scanner.
+            throw new DoNotRetryIOException("Reset scanner", ioe);
+          } else {
+            // The outer layers will retry
+            throw ioe;
           }
-          throw new DoNotRetryIOException("Reset scanner", ioe);
-        } else if (ioe instanceof RegionServerStoppedException) {
-          // Throw a DNRE so that we break out of cycle of calling RSSE
-          // when what we need is to open scanner against new location.
-          // Attach RSSE to signal client that it needs to resetup scanner.
-          throw new DoNotRetryIOException("Reset scanner", ioe);
-        } else {
-          // The outer layers will retry
-          throw ioe;
         }
+        return rrs;
       }
-      return rrs;
     }
     return null;
   }
@@ -161,10 +185,12 @@ public class ScannerCallable extends Ser
       return;
     }
     for (Result rr : rrs) {
-      this.scanMetrics.countOfBytesInResults.inc(rr.getBytes().getLength());
-      if (isRegionServerRemote) {
-        this.scanMetrics.countOfBytesInRemoteResults.inc(
-          rr.getBytes().getLength());
+      if (rr.getBytes() != null) {
+        this.scanMetrics.countOfBytesInResults.inc(rr.getBytes().getLength());
+        if (isRegionServerRemote) {
+          this.scanMetrics.countOfBytesInRemoteResults.inc(
+            rr.getBytes().getLength());
+        }
       }
     }
   }
@@ -175,7 +201,13 @@ public class ScannerCallable extends Ser
     }
     try {
       incRPCcallsMetrics();
-      this.server.close(this.scannerId);
+      ScanRequest request =
+        RequestConverter.buildScanRequest(this.scannerId, 0, true);
+      try {
+        server.scan(null, request);
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
+      }
     } catch (IOException e) {
       LOG.warn("Ignore, probably already closed", e);
     }
@@ -184,8 +216,16 @@ public class ScannerCallable extends Ser
 
   protected long openScanner() throws IOException {
     incRPCcallsMetrics();
-    return this.server.openScanner(this.location.getRegionInfo().getRegionName(),
-      this.scan);
+    ScanRequest request =
+      RequestConverter.buildScanRequest(
+        this.location.getRegionInfo().getRegionName(),
+        this.scan, 0, false);
+    try {
+      ScanResponse response = server.scan(null, request);
+      return response.getScannerId();
+    } catch (ServiceException se) {
+      throw ProtobufUtil.getRemoteException(se);
+    }
   }
 
   protected Scan getScan() {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Fri Apr 13 20:28:21 2012
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.DoNotRetr
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.ipc.RemoteException;
 
@@ -57,7 +57,7 @@ public abstract class ServerCallable<T> 
   protected final byte [] tableName;
   protected final byte [] row;
   protected HRegionLocation location;
-  protected HRegionInterface server;
+  protected ClientProtocol server;
   protected int callTimeout;
   protected long startTime, endTime;
 
@@ -84,8 +84,8 @@ public abstract class ServerCallable<T> 
    */
   public void connect(final boolean reload) throws IOException {
     this.location = connection.getRegionLocation(tableName, row, reload);
-    this.server = connection.getHRegionConnection(location.getHostname(),
-      location.getPort());
+    this.server = connection.getClient(location.getHostname(),
+        location.getPort());
   }
 
   /** @return the server name
@@ -224,7 +224,7 @@ public abstract class ServerCallable<T> 
     }
   }
 
-  private static Throwable translateException(Throwable t) throws IOException {
+  protected static Throwable translateException(Throwable t) throws IOException {
     if (t instanceof UndeclaredThrowableException) {
       t = t.getCause();
     }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/ParseConstants.java Fri Apr 13 20:28:21 2012
@@ -19,13 +19,10 @@
  */
 package org.apache.hadoop.hbase.filter;
 
+import java.nio.ByteBuffer;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.util.Bytes;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import org.apache.hadoop.hbase.filter.*;
 
 /**
  * ParseConstants holds a bunch of constants related to parsing Filter Strings

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Fri Apr 13 20:28:21 2012
@@ -100,6 +100,7 @@ import org.apache.hadoop.io.WritableFact
 import org.apache.hadoop.io.WritableUtils;
 
 import com.google.protobuf.Message;
+import com.google.protobuf.RpcController;
 
 /**
  * This is a customized version of the polymorphic hadoop
@@ -268,6 +269,8 @@ public class HbaseObjectWritable impleme
     GENERIC_ARRAY_CODE = code++;
     addToMap(Array.class, GENERIC_ARRAY_CODE);
 
+    addToMap(RpcController.class, code++);
+
     // make sure that this is the last statement in this static block
     NEXT_CLASS_CODE = code;
   }
@@ -357,7 +360,7 @@ public class HbaseObjectWritable impleme
     }
   }
 
-  static Integer getClassCode(final Class<?> c)
+  public static Integer getClassCode(final Class<?> c)
   throws IOException {
     Integer code = CLASS_TO_CODE.get(c);
     if (code == null ) {
@@ -726,7 +729,7 @@ public class HbaseObjectWritable impleme
    * @return the instantiated Message instance
    * @throws IOException if an IO problem occurs
    */
-  private static Message tryInstantiateProtobuf(
+  public static Message tryInstantiateProtobuf(
       Class<?> protoClass,
       DataInput dataIn) throws IOException {
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java Fri Apr 13 20:28:21 2012
@@ -110,6 +110,14 @@ public class TimeRange implements Writab
   }
 
   /**
+   * Check if it is for all time
+   * @return true if it is for all time
+   */
+  public boolean isAllTime() {
+    return allTime;
+  }
+
+  /**
    * Check if the specified timestamp is within this TimeRange.
    * <p>
    * Returns true if within interval [minStamp, maxStamp), false

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java Fri Apr 13 20:28:21 2012
@@ -19,18 +19,23 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.ServerCallable;
 import org.apache.hadoop.hbase.client.coprocessor.Exec;
 import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
-
 /**
  * Backs a {@link CoprocessorProtocol} subclass proxy and forwards method
  * invocations for server execution.  Note that internally this will issue a
@@ -74,8 +79,13 @@ public class ExecRPCInvoker implements I
       ServerCallable<ExecResult> callable =
           new ServerCallable<ExecResult>(connection, table, row) {
             public ExecResult call() throws Exception {
-              return server.execCoprocessor(location.getRegionInfo().getRegionName(),
-                  exec);
+              byte[] regionName = location.getRegionInfo().getRegionName();
+              ExecCoprocessorRequest request =
+                RequestConverter.buildExecCoprocessorRequest(regionName, exec);
+              ExecCoprocessorResponse response =
+                server.execCoprocessor(null, request);
+              Object value = ProtobufUtil.toObject(response.getValue());
+              return new ExecResult(regionName, value);
             }
           };
       ExecResult result = callable.withRetries();

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java Fri Apr 13 20:28:21 2012
@@ -19,19 +19,23 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.protobuf.ClientProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.io.VersionMismatchException;
 import org.apache.hadoop.io.VersionedWritable;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-
 /** A method invocation, including the method name and its parameters.*/
 @InterfaceAudience.Private
 public class Invocation extends VersionedWritable implements Configurable {
@@ -43,6 +47,17 @@ public class Invocation extends Versione
   private long clientVersion;
   private int clientMethodsHash;
 
+
+  // For generated protocol classes which don't have VERSION field,
+  // such as protobuf interfaces.
+  private static final Map<Class<?>, Long>
+    PROTOCOL_VERSION = new HashMap<Class<?>, Long>();
+
+  static {
+    PROTOCOL_VERSION.put(ClientService.BlockingInterface.class,
+      Long.valueOf(ClientProtocol.VERSION));
+  }
+
   private static byte RPC_VERSION = 1;
 
   public Invocation() {}
@@ -51,22 +66,28 @@ public class Invocation extends Versione
     this.methodName = method.getName();
     this.parameterClasses = method.getParameterTypes();
     this.parameters = parameters;
-    if (method.getDeclaringClass().equals(VersionedProtocol.class)) {
+    Class<?> declaringClass = method.getDeclaringClass();
+    if (declaringClass.equals(VersionedProtocol.class)) {
       //VersionedProtocol is exempted from version check.
       clientVersion = 0;
       clientMethodsHash = 0;
     } else {
       try {
-        Field versionField = method.getDeclaringClass().getField("VERSION");
-        versionField.setAccessible(true);
-        this.clientVersion = versionField.getLong(method.getDeclaringClass());
+        Long version = PROTOCOL_VERSION.get(declaringClass);
+        if (version != null) {
+          this.clientVersion = version.longValue();
+        } else {
+          Field versionField = declaringClass.getField("VERSION");
+          versionField.setAccessible(true);
+          this.clientVersion = versionField.getLong(declaringClass);
+        }
       } catch (NoSuchFieldException ex) {
-        throw new RuntimeException("The " + method.getDeclaringClass(), ex);
+        throw new RuntimeException("The " + declaringClass, ex);
       } catch (IllegalAccessException ex) {
         throw new RuntimeException(ex);
       }
-      this.clientMethodsHash = ProtocolSignature.getFingerprint(method
-          .getDeclaringClass().getMethods());
+      this.clientMethodsHash = ProtocolSignature.getFingerprint(
+        declaringClass.getMethods());
     }
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/WritableRpcEngine.java Fri Apr 13 20:28:21 2012
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.client.Operation;
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Objects;
@@ -52,6 +53,8 @@ import org.apache.hadoop.conf.*;
 
 import org.codehaus.jackson.map.ObjectMapper;
 
+import com.google.protobuf.ServiceException;
+
 /** An RpcEngine implementation for Writable data. */
 @InterfaceAudience.Private
 class WritableRpcEngine implements RpcEngine {
@@ -407,6 +410,9 @@ class WritableRpcEngine implements RpcEn
         if (target instanceof IOException) {
           throw (IOException)target;
         }
+        if (target instanceof ServiceException) {
+          throw ProtobufUtil.getRemoteException((ServiceException)target);
+        }
         IOException ioe = new IOException(target.toString());
         ioe.setStackTrace(target.getStackTrace());
         throw ioe;

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1325937&r1=1325936&r2=1325937&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java Fri Apr 13 20:28:21 2012
@@ -68,10 +68,13 @@ import org.apache.hadoop.hbase.io.Refere
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
-import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
@@ -486,7 +489,11 @@ public class LoadIncrementalHFiles exten
         LOG.debug("Going to connect to server " + location + " for row "
             + Bytes.toStringBinary(row));
         byte[] regionName = location.getRegionInfo().getRegionName();
-        return server.bulkLoadHFiles(famPaths, regionName);
+        BulkLoadHFileRequest request =
+          RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName);
+        BulkLoadHFileResponse response =
+          server.bulkLoadHFile(null, request);
+        return response.getLoaded();
       }
     };
 

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/AdminProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/AdminProtocol.java?rev=1325937&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/AdminProtocol.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/AdminProtocol.java Fri Apr 13 20:28:21 2012
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.protobuf;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.VersionedProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.security.TokenInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol that a HBase client uses to communicate with a region server.
+ */
+@KerberosInfo(
+  serverPrincipal = "hbase.regionserver.kerberos.principal")
+@TokenInfo("HBASE_AUTH_TOKEN")
+@InterfaceAudience.Private
+public interface AdminProtocol extends
+    AdminService.BlockingInterface, VersionedProtocol {
+  public static final long VERSION = 1L;
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ClientProtocol.java?rev=1325937&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ClientProtocol.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/protobuf/ClientProtocol.java Fri Apr 13 20:28:21 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.protobuf;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ipc.VersionedProtocol;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.security.TokenInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol that a HBase client uses to communicate with a region server.
+ */
+@KerberosInfo(
+  serverPrincipal = "hbase.regionserver.kerberos.principal")
+@TokenInfo("HBASE_AUTH_TOKEN")
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ClientProtocol extends
+    ClientService.BlockingInterface, VersionedProtocol {
+  public static final long VERSION = 1L;
+}