You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/10/09 00:49:50 UTC

svn commit: r703013 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/ipc/ src/test/org/apache/hadoop/hbase/

Author: stack
Date: Wed Oct  8 15:49:50 2008
New Revision: 703013

URL: http://svn.apache.org/viewvc?rev=703013&view=rev
Log:
HBASE-576 Investigate IPC performance

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestCompare.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Wed Oct  8 15:49:50 2008
@@ -26,6 +26,7 @@
                (Doğacan Güney via Stack)
    HBASE-908   Add approximate counting to CountingBloomFilter
                (Andrzej Bialecki via Stack)
+   HBASE-576   Investigate IPC performance
 
   NEW FEATURES
    HBASE-875   Use MurmurHash instead of JenkinsHash [in bloomfilters]

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Wed Oct  8 15:49:50 2008
@@ -126,6 +126,11 @@
     addToMap(RowResult.class, code++);
     addToMap(HRegionInfo[].class, code++);
     addToMap(MapWritable.class, code++);
+    try {
+      addToMap(Class.forName("[Lorg.apache.hadoop.hbase.io.RowResult;"), code++);
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+    }
   }
   
   private Class<?> declaredClass;

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java Wed Oct  8 15:49:50 2008
@@ -38,8 +38,9 @@
    * -- HADOOP-2495 and then to 3 when we changed the RPC to send codes instead
    * of actual class names (HADOOP-2519).
    * <p>Version 4 when we moved to all byte arrays (HBASE-42).
+   * <p>Version 5  HBASE-576.
    */
-  public static final long versionID = 4L;
+  public static final long versionID = 5L;
 
   /** @return true if master is available */
   public boolean isMasterRunning();
@@ -126,4 +127,4 @@
    * @return address of server that serves the root region
    */
   public HServerAddress findRootRegion();
-}
\ No newline at end of file
+}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java Wed Oct  8 15:49:50 2008
@@ -30,6 +30,7 @@
 /**
  * HRegionServers interact with the HMasterRegionInterface to report on local 
  * goings-on and to obtain data-handling instructions from the HMaster.
+ * <p>Changes here need to be reflected in HbaseObjectWritable HbaseRPC#Invoker.
  */
 public interface HMasterRegionInterface extends VersionedProtocol {
   /**
@@ -38,8 +39,9 @@
    * MapWritable instead of a HbaseMapWritable as part of HBASE-82 changes.
    * Version 3 was when HMsg was refactored so it could carry optional
    * messages (HBASE-504).
+   * <p>HBASE-576 we moved this to 4.
    */
-  public static final long versionID = 3L;
+  public static final long versionID = 4L;
 
   /**
    * Called when a region server first starts

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Wed Oct  8 15:49:50 2008
@@ -37,8 +37,9 @@
   /**
    * Protocol version.
    * Upped to 5 when we added scanner caching
+   * <p>HBASE-576, we moved this to 6.
    */
-  public static final long versionID = 5L;
+  public static final long versionID = 6L;
 
   /** 
    * Get metainfo about an HRegion
@@ -220,4 +221,4 @@
    */
   public void unlockRow(final byte [] regionName, final long lockId)
   throws IOException;
-}
\ No newline at end of file
+}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java Wed Oct  8 15:49:50 2008
@@ -30,6 +30,8 @@
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -40,8 +42,7 @@
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
-import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.HBaseClient;
@@ -82,11 +83,28 @@
   private static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.HbaseRPC");
 
-  private HbaseRPC() {}                                  // no public ctor
+  private HbaseRPC() {
+    super();
+  }                                  // no public ctor
 
 
   /** A method invocation, including the method name and its parameters.*/
   private static class Invocation implements Writable, Configurable {
+    // Here we maintain two static maps of method names to code and vice versa.
+    private static final Map<Byte, String> CODE_TO_METHODNAME =
+      new HashMap<Byte, String>();
+    private static final Map<String, Byte> METHODNAME_TO_CODE =
+      new HashMap<String, Byte>();
+    // Special code that means 'not-encoded'.
+    private static final byte NOT_ENCODED = 0;
+    static {
+      byte code = NOT_ENCODED + 1;
+      code = addToMap(VersionedProtocol.class, code);
+      code = addToMap(HMasterInterface.class, code);
+      code = addToMap(HMasterRegionInterface.class, code);
+      code = addToMap(TransactionalRegionInterface.class, code);
+    }
+
     private String methodName;
     @SuppressWarnings("unchecked")
     private Class[] parameterClasses;
@@ -94,7 +112,9 @@
     private Configuration conf;
 
     /** default constructor */
-    public Invocation() {}
+    public Invocation() {
+      super();
+    }
 
     /**
      * @param method
@@ -117,21 +137,23 @@
     public Object[] getParameters() { return parameters; }
 
     public void readFields(DataInput in) throws IOException {
-      methodName = Text.readString(in);
+      byte code = in.readByte();
+      methodName = CODE_TO_METHODNAME.get(Byte.valueOf(code));
       parameters = new Object[in.readInt()];
       parameterClasses = new Class[parameters.length];
-      ObjectWritable objectWritable = new ObjectWritable();
+      HbaseObjectWritable objectWritable = new HbaseObjectWritable();
       for (int i = 0; i < parameters.length; i++) {
-        parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
+        parameters[i] = HbaseObjectWritable.readObject(in, objectWritable,
+          this.conf);
         parameterClasses[i] = objectWritable.getDeclaredClass();
       }
     }
 
     public void write(DataOutput out) throws IOException {
-      Text.writeString(out, methodName);
+      writeMethodNameCode(out, this.methodName);
       out.writeInt(parameterClasses.length);
       for (int i = 0; i < parameterClasses.length; i++) {
-        ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
+        HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
                                    conf);
       }
     }
@@ -157,7 +179,54 @@
     public Configuration getConf() {
       return this.conf;
     }
+    
+    private static void addToMap(final String name, final byte code) {
+      if (METHODNAME_TO_CODE.containsKey(name)) {
+        return;
+      }
+      METHODNAME_TO_CODE.put(name, Byte.valueOf(code));
+      CODE_TO_METHODNAME.put(Byte.valueOf(code), name);
+    }
+    
+    /*
+     * @param c Class whose methods we'll add to the map of methods to codes
+     * (and vice versa).
+     * @param code Current state of the byte code.
+     * @return State of <code>code</code> when this method is done.
+     */
+    private static byte addToMap(final Class<?> c, final byte code) {
+      byte localCode = code;
+      Method [] methods = c.getMethods();
+      // There are no guarantees about the order in which items are returned in
+      // so do a sort (Was seeing that sort was one way on one server and then
+      // another on different server).
+      Arrays.sort(methods, new Comparator<Method>() {
+        public int compare(Method left, Method right) {
+          return left.getName().compareTo(right.getName());
+        }
+      });
+      for (int i = 0; i < methods.length; i++) {
+        addToMap(methods[i].getName(), localCode++);
+      }
+      return localCode;
+    }
 
+    /*
+     * Write out the code byte for passed Class.
+     * @param out
+     * @param c
+     * @throws IOException
+     */
+    static void writeMethodNameCode(final DataOutput out, final String methodname)
+    throws IOException {
+      Byte code = METHODNAME_TO_CODE.get(methodname);
+      if (code == null) {
+        LOG.error("Unsupported type " + methodname);
+        throw new UnsupportedOperationException("No code for unexpected " +
+          methodname);
+      }
+      out.writeByte(code.byteValue());
+    }
   }
 
   /* Cache a client using its socket factory as the hash key */
@@ -181,7 +250,7 @@
       // per-job, we choose (a).
       Client client = clients.get(factory);
       if (client == null) {
-        client = new HBaseClient(ObjectWritable.class, conf, factory);
+        client = new HBaseClient(HbaseObjectWritable.class, conf, factory);
         clients.put(factory, client);
       } else {
         ((HBaseClient)client).incCount();
@@ -217,7 +286,7 @@
     }
   }
 
-  private static ClientCache CLIENTS=new ClientCache();
+  private static ClientCache CLIENTS = new ClientCache();
   
   private static class Invoker implements InvocationHandler {
     private InetSocketAddress address;
@@ -242,7 +311,7 @@
         Method method, Object[] args)
       throws Throwable {
       long startTime = System.currentTimeMillis();
-      ObjectWritable value = (ObjectWritable)
+      HbaseObjectWritable value = (HbaseObjectWritable)
         client.call(new Invocation(method, args), address, ticket);
       long callTime = System.currentTimeMillis() - startTime;
       LOG.debug("Call: " + method.getName() + " " + callTime);
@@ -379,8 +448,8 @@
    */
   public static VersionedProtocol getProxy(Class<?> protocol,
       long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
-      Configuration conf, SocketFactory factory) throws IOException {    
-
+      Configuration conf, SocketFactory factory)
+  throws IOException {    
     VersionedProtocol proxy =
         (VersionedProtocol) Proxy.newProxyInstance(
             protocol.getClassLoader(), new Class[] { protocol },
@@ -452,7 +521,7 @@
       (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
     for (int i = 0; i < values.length; i++)
       if (wrappedValues[i] != null)
-        values[i] = ((ObjectWritable)wrappedValues[i]).get();
+        values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
     
     return values;
     } finally {
@@ -545,7 +614,6 @@
       try {
         Invocation call = (Invocation)param;
         if (verbose) log("Call: " + call);
-        
         Method method =
           implementation.getMethod(call.getMethodName(),
                                    call.getParameterClasses());
@@ -573,7 +641,7 @@
 
         if (verbose) log("Return: "+value);
 
-        return new ObjectWritable(method.getReturnType(), value);
+        return new HbaseObjectWritable(method.getReturnType(), value);
 
       } catch (InvocationTargetException e) {
         Throwable target = e.getTargetException();

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java Wed Oct  8 15:49:50 2008
@@ -29,8 +29,10 @@
  * 
  */
 public interface TransactionalRegionInterface extends HRegionInterface {
-  /** Interface version number */
-  public static final long versionID = 1L;
+  /** Interface version number
+   *  Moved to 2 for hbase-576.
+   */
+  public static final long versionID = 2L;
 
   /**
    * Sent to initiate a transaction.

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java Wed Oct  8 15:49:50 2008
@@ -22,6 +22,7 @@
 import java.io.IOException;
 import java.io.PrintStream;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
@@ -51,7 +52,8 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.log4j.Logger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 
 /**
@@ -71,12 +73,12 @@
  * runs an individual client. Each client does about 1GB of data.
  */
 public class PerformanceEvaluation implements HConstants {
-  static final Logger LOG =
-    Logger.getLogger(PerformanceEvaluation.class.getName());
+  private static final Log LOG = LogFactory.getLog(PerformanceEvaluation.class.getName());
   
   private static final int ROW_LENGTH = 1000;
   private static final int ONE_GB = 1024 * 1024 * 1000;
   private static final int ROWS_PER_GB = ONE_GB / ROW_LENGTH;
+  
   static final byte [] COLUMN_NAME = Bytes.toBytes(COLUMN_FAMILY_STR + "data");
   
   protected static HTableDescriptor tableDescriptor;
@@ -102,6 +104,7 @@
   
   volatile HBaseConfiguration conf;
   private boolean miniCluster = false;
+  private boolean nomapred = false;
   private int N = 1;
   private int R = ROWS_PER_GB;
   private static final Path PERF_EVAL_DIR = new Path("performance_evaluation");
@@ -223,10 +226,68 @@
   private void runNIsMoreThanOne(final String cmd)
   throws IOException {
     checkTable(new HBaseAdmin(conf));
-    
-    // Run a mapreduce job.  Run as many maps as asked-for clients.
-    // Before we start up the job, write out an input file with instruction
-    // per client regards which row they are to start on.
+    if (this.nomapred) {
+      doMultipleClients(cmd);
+    } else {
+      doMapReduce(cmd);
+    }
+  }
+  
+  /*
+   * Run all clients in this vm each to its own thread.
+   * @param cmd Command to run.
+   * @throws IOException
+   */
+  @SuppressWarnings("unused")
+  private void doMultipleClients(final String cmd) throws IOException {
+    final List<Thread> threads = new ArrayList<Thread>(this.N);
+    final int perClientRows = R/N;
+    for (int i = 0; i < this.N; i++) {
+      Thread t = new Thread (Integer.toString(i)) {
+        @Override
+        public void run() {
+          super.run();
+          PerformanceEvaluation pe = new PerformanceEvaluation(conf);
+          int index = Integer.parseInt(getName());
+          try {
+            long elapsedTime = pe.runOneClient(cmd, index * perClientRows,
+               perClientRows, perClientRows,
+               new Status() {
+                  public void setStatus(final String msg) throws IOException {
+                    LOG.info("client-" + getName() + " " + msg);
+                  }
+                });
+            LOG.info("Finished " + getName() + " in " + elapsedTime +
+              "ms writing " + perClientRows + " rows");
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      };
+      threads.add(t);
+    }
+    for (Thread t: threads) {
+      t.start();
+    }
+    for (Thread t: threads) {
+      while(t.isAlive()) {
+        try {
+          t.join();
+        } catch (InterruptedException e) {
+          LOG.debug("Interrupted, continuing" + e.toString());
+        }
+      }
+    }
+  }
+  
+  /*
+   * Run a mapreduce job.  Run as many maps as asked-for clients.
+   * Before we start up the job, write out an input file with instruction
+   * per client regards which row they are to start on.
+   * @param cmd Command to run.
+   * @throws IOException
+   */
+  private void doMapReduce(final String cmd) throws IOException {
     Path inputDir = writeInputFile(this.conf);
     this.conf.set(EvaluationMapTask.CMD_KEY, cmd);
     JobConf job = new JobConf(this.conf, this.getClass());
@@ -274,7 +335,7 @@
     }
     return subdir;
   }
-  
+
   /*
    * A test.
    * Subclass to particularize what happens per row.
@@ -559,7 +620,7 @@
     if (cmd.equals(RANDOM_READ_MEM)) {
       // For this one test, so all fits in memory, make R smaller (See
       // pg. 9 of BigTable paper).
-      R = (ONE_GB / 10) * N;
+      R = (this.R / 10) * N;
     }
     
     MiniHBaseCluster hbaseMiniCluster = null;
@@ -574,7 +635,6 @@
       conf.set(HConstants.HBASE_DIR, parentdir.toString());
       fs.mkdirs(parentdir);
       FSUtils.setVersion(fs, parentdir);
-      
       hbaseMiniCluster = new MiniHBaseCluster(this.conf, N);
     }
     
@@ -604,13 +664,17 @@
       System.err.println(message);
     }
     System.err.println("Usage: java " + this.getClass().getName() +
-        "[--master=host:port] [--miniCluster] <command> <nclients>");
+        " [--master=HOST:PORT] \\");
+    System.err.println("  [--miniCluster] [--nomapred] [--rows=ROWS] <command> <nclients>");
     System.err.println();
     System.err.println("Options:");
     System.err.println(" master          Specify host and port of HBase " +
         "cluster master. If not present,");
     System.err.println("                 address is read from configuration");
     System.err.println(" miniCluster     Run the test on an HBaseMiniCluster");
+    System.err.println(" nomapred        Run multiple clients using threads " +
+      "(rather than use mapreduce)");
+    System.err.println(" rows            Rows each client runs. Default: One million");
     System.err.println();
     System.err.println("Command:");
     System.err.println(" randomRead      Run random read test");
@@ -643,7 +707,7 @@
     }
    
     // Set total number of rows to write.
-    R = ROWS_PER_GB * N;
+    this.R = this.R * N;
   }
   
   private int doCommandLine(final String[] args) {
@@ -675,6 +739,18 @@
           this.miniCluster = true;
           continue;
         }
+        
+        final String nmr = "--nomapred";
+        if (cmd.startsWith(nmr)) {
+          this.nomapred = true;
+          continue;
+        }
+        
+        final String rows = "--rows=";
+        if (cmd.startsWith(rows)) {
+          this.R = Integer.parseInt(cmd.substring(rows.length()));
+          continue;
+        }
        
         if (COMMANDS.contains(cmd)) {
           getArgs(i + 1, args);
@@ -699,6 +775,6 @@
   public static void main(final String[] args) {
     HBaseConfiguration c = new HBaseConfiguration();
     System.exit(new PerformanceEvaluation(c).
-      doCommandLine(args));
+    doCommandLine(args));
   }
 }

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestCompare.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestCompare.java?rev=703013&r1=703012&r2=703013&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestCompare.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestCompare.java Wed Oct  8 15:49:50 2008
@@ -59,7 +59,7 @@
    */
   public void testHStoreKeyBorderCases() {
     HRegionInfo info = new HRegionInfo(new HTableDescriptor("testtable"),
-        HConstants.EMPTY_BYTE_ARRAY,HConstants.EMPTY_BYTE_ARRAY);
+        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
     HStoreKey rowA = new HStoreKey("testtable,www.hbase.org/,1234",
         "", Long.MAX_VALUE, info);
     HStoreKey rowB = new HStoreKey("testtable,www.hbase.org/%20,99999",