You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ga...@apache.org on 2011/02/04 19:25:40 UTC

svn commit: r1067252 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/client/coprocessor/ src/main/java/org/apache/hadoop/hbase/ipc/ src/main/java/org/apache/hadoop/hbase/util/ src/test/java/org/apache/hadoop/hbase/coprocessor/

Author: garyh
Date: Fri Feb  4 18:25:40 2011
New Revision: 1067252

URL: http://svn.apache.org/viewvc?rev=1067252&view=rev
Log:
HBASE-3400  Coprocessor Support for Generic Interfaces

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Classes.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/GenericEndpoint.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/GenericProtocol.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Exec.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ExecResult.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1067252&r1=1067251&r2=1067252&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Feb  4 18:25:40 2011
@@ -40,6 +40,8 @@ Release 0.91.0 - Unreleased
                the query matcher and can lead to incorrect behavior
    HBASE-3492  NPE while splitting table with empty column family store
    HBASE-3495  Shell is failing on subsequent split calls
+   HBASE-3400  Coprocessor Support for Generic Interfaces
+               (Ed Kohlwey via Gary Helmling)
 
 
   IMPROVEMENTS

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Exec.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Exec.java?rev=1067252&r1=1067251&r2=1067252&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Exec.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/Exec.java Fri Feb  4 18:25:40 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.io.HbaseO
 import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
 import org.apache.hadoop.hbase.ipc.Invocation;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Classes;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -83,14 +84,37 @@ public class Exec extends Invocation imp
 
   @Override
   public void write(DataOutput out) throws IOException {
-    super.write(out);
+    // fields for Invocation
+    out.writeUTF(this.methodName);
+    out.writeInt(parameterClasses.length);
+    for (int i = 0; i < parameterClasses.length; i++) {
+      HbaseObjectWritable.writeObject(out, parameters[i], parameters[i].getClass(),
+                                 conf);
+      out.writeUTF(parameterClasses[i].getName());
+    }
+    // fields for Exec
     Bytes.writeByteArray(out, referenceRow);
     out.writeUTF(protocol.getName());
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
+    // fields for Invocation
+    methodName = in.readUTF();
+    parameters = new Object[in.readInt()];
+    parameterClasses = new Class[parameters.length];
+    HbaseObjectWritable objectWritable = new HbaseObjectWritable();
+    for (int i = 0; i < parameters.length; i++) {
+      parameters[i] = HbaseObjectWritable.readObject(in, objectWritable,
+        this.conf);
+      String parameterClassName = in.readUTF();
+      try {
+        parameterClasses[i] = Classes.extendedForName(parameterClassName);
+      } catch (ClassNotFoundException e) {
+        throw new IOException("Couldn't find class: " + parameterClassName);
+      }
+    }
+    // fields for Exec
     referenceRow = Bytes.readByteArray(in);
     String protocolName = in.readUTF();
     try {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ExecResult.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ExecResult.java?rev=1067252&r1=1067251&r2=1067252&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ExecResult.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/coprocessor/ExecResult.java Fri Feb  4 18:25:40 2011
@@ -21,11 +21,13 @@ package org.apache.hadoop.hbase.client.c
 
 import org.apache.hadoop.hbase.io.HbaseObjectWritable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Classes;
 import org.apache.hadoop.io.Writable;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.Serializable;
 
 /**
  * Represents the return value from a
@@ -70,12 +72,25 @@ public class ExecResult implements Writa
   public void write(DataOutput out) throws IOException {
     Bytes.writeByteArray(out, regionName);
     HbaseObjectWritable.writeObject(out, value,
-        (valueType != null ? valueType : Writable.class), null);
+        value.getClass(), null);
+    Class<?> alternativeSerializationClass;
+    if(value instanceof Writable){
+      alternativeSerializationClass = Writable.class;
+    } else {
+      alternativeSerializationClass = Serializable.class;
+    }
+    out.writeUTF((valueType != null ? valueType : alternativeSerializationClass).getName());
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
     regionName = Bytes.readByteArray(in);
     value = HbaseObjectWritable.readObject(in, null);
+    String className = in.readUTF();
+    try {
+      valueType = Classes.extendedForName(className);
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Unable to find class of type: " + className );
+    }
   }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java?rev=1067252&r1=1067251&r2=1067252&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/ipc/Invocation.java Fri Feb  4 18:25:40 2011
@@ -31,11 +31,11 @@ import java.lang.reflect.Method;
 
 /** A method invocation, including the method name and its parameters.*/
 public class Invocation implements Writable, Configurable {
-  private String methodName;
+  protected String methodName;
   @SuppressWarnings("unchecked")
-  private Class[] parameterClasses;
-  private Object[] parameters;
-  private Configuration conf;
+  protected Class[] parameterClasses;
+  protected Object[] parameters;
+  protected Configuration conf;
 
   public Invocation() {}
 

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Classes.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Classes.java?rev=1067252&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Classes.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Classes.java Fri Feb  4 18:25:40 2011
@@ -0,0 +1,44 @@
+package org.apache.hadoop.hbase.util;
+
+/**
+ * Utilities for class manipulation.
+ */
+public class Classes {
+
+  /**
+   * Equivalent of {@link Class#forName(String)} which also returns classes for
+   * primitives like <code>boolean</code>, etc.
+   * 
+   * @param className
+   *          The name of the class to retrieve. Can be either a normal class or
+   *          a primitive class.
+   * @return The class specified by <code>className</code>
+   * @throws ClassNotFoundException
+   *           If the requested class can not be found.
+   */
+  public static Class<?> extendedForName(String className)
+      throws ClassNotFoundException {
+    Class<?> valueType;
+    if (className.equals("boolean")) {
+      valueType = boolean.class;
+    } else if (className.equals("byte")) {
+      valueType = byte.class;
+    } else if (className.equals("short")) {
+      valueType = short.class;
+    } else if (className.equals("int")) {
+      valueType = int.class;
+    } else if (className.equals("long")) {
+      valueType = long.class;
+    } else if (className.equals("float")) {
+      valueType = float.class;
+    } else if (className.equals("double")) {
+      valueType = double.class;
+    } else if (className.equals("char")) {
+      valueType = char.class;
+    } else {
+      valueType = Class.forName(className);
+    }
+    return valueType;
+  }
+
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/GenericEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/GenericEndpoint.java?rev=1067252&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/GenericEndpoint.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/GenericEndpoint.java Fri Feb  4 18:25:40 2011
@@ -0,0 +1,11 @@
+package org.apache.hadoop.hbase.coprocessor;
+
+public class GenericEndpoint extends BaseEndpointCoprocessor implements
+    GenericProtocol {
+
+  @Override
+  public <T> T doWork(T genericObject) {
+    return genericObject;
+  }
+
+}

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/GenericProtocol.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/GenericProtocol.java?rev=1067252&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/GenericProtocol.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/GenericProtocol.java Fri Feb  4 18:25:40 2011
@@ -0,0 +1,17 @@
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+
+public interface GenericProtocol extends CoprocessorProtocol {
+
+  /**
+   * Simple interface to allow the passing of a generic parameter to see if the
+   * RPC framework can accommodate generics.
+   * 
+   * @param <T>
+   * @param genericObject
+   * @return
+   */
+  public <T> T doWork(T genericObject);
+
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java?rev=1067252&r1=1067251&r2=1067252&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java Fri Feb  4 18:25:40 2011
@@ -19,17 +19,25 @@
  */
 package org.apache.hadoop.hbase.coprocessor;
 
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.client.coprocessor.*;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.*;
-import org.apache.hadoop.conf.Configuration;
-
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 
-import java.util.Map;
 import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 /**
  * TestEndpoint: test cases to verify coprocessor Endpoint
@@ -39,12 +47,12 @@ public class TestCoprocessorEndpoint {
   private static final byte[] TEST_TABLE = Bytes.toBytes("TestTable");
   private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
   private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
-  private static byte [] ROW = Bytes.toBytes("testRow");
+  private static byte[] ROW = Bytes.toBytes("testRow");
 
   private static final int ROWSIZE = 20;
   private static final int rowSeperator1 = 5;
   private static final int rowSeperator2 = 12;
-  private static byte [][] ROWS = makeN(ROW, ROWSIZE);
+  private static byte[][] ROWS = makeN(ROW, ROWSIZE);
 
   private static HBaseTestingUtility util = new HBaseTestingUtility();
   private static MiniHBaseCluster cluster = null;
@@ -53,18 +61,19 @@ public class TestCoprocessorEndpoint {
   public static void setupBeforeClass() throws Exception {
     // set configure to indicate which cp should be loaded
     Configuration conf = util.getConfiguration();
-    conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
-        "org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint");
+    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+        "org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint",
+        "org.apache.hadoop.hbase.coprocessor.GenericEndpoint");
 
     util.startMiniCluster(2);
     cluster = util.getMiniHBaseCluster();
 
     HTable table = util.createTable(TEST_TABLE, TEST_FAMILY);
     util.createMultiRegions(util.getConfiguration(), table, TEST_FAMILY,
-        new byte[][]{ HConstants.EMPTY_BYTE_ARRAY, ROWS[rowSeperator1],
-      ROWS[rowSeperator2]});
+                            new byte[][] { HConstants.EMPTY_BYTE_ARRAY,
+                                ROWS[rowSeperator1], ROWS[rowSeperator2] });
 
-    for(int i = 0; i < ROWSIZE; i++) {
+    for (int i = 0; i < ROWSIZE; i++) {
       Put put = new Put(ROWS[i]);
       put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
       table.put(put);
@@ -80,27 +89,56 @@ public class TestCoprocessorEndpoint {
   }
 
   @Test
+  public void testGeneric() throws Throwable {
+    HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
+    GenericProtocol protocol = table.coprocessorProxy(GenericProtocol.class,
+                                                      Bytes.toBytes("testRow"));
+    String workResult1 = protocol.doWork("foo");
+    assertEquals("foo", workResult1);
+    byte[] workResult2 = protocol.doWork(new byte[]{1});
+    assertArrayEquals(new byte[]{1}, workResult2);
+    byte workResult3 = protocol.doWork((byte)1);
+    assertEquals((byte)1, workResult3);
+    char workResult4 = protocol.doWork('c');
+    assertEquals('c', workResult4);
+    boolean workResult5 = protocol.doWork(true);
+    assertEquals(true, workResult5);
+    short workResult6 = protocol.doWork((short)1);
+    assertEquals((short)1, workResult6);
+    int workResult7 = protocol.doWork(5);
+    assertEquals(5, workResult7);
+    long workResult8 = protocol.doWork(5l);
+    assertEquals(5l, workResult8);
+    double workResult9 = protocol.doWork(6d);
+    assertEquals(6d, workResult9, 0.01);
+    float workResult10 = protocol.doWork(6f);
+    assertEquals(6f, workResult10, 0.01);
+    Text workResult11 = protocol.doWork(new Text("foo"));
+    assertEquals(new Text("foo"), workResult11);
+  }
+
+  @Test
   public void testAggregation() throws Throwable {
     HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
     Scan scan;
     Map<byte[], Long> results;
 
     // scan: for all regions
-    results = table.coprocessorExec(ColumnAggregationProtocol.class,
-        ROWS[rowSeperator1 - 1],
-        ROWS[rowSeperator2 + 1],
-        new Batch.Call<ColumnAggregationProtocol,Long>() {
-          public Long call(ColumnAggregationProtocol instance)
-          throws IOException{
-            return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
-          }
-        });
+    results = table
+        .coprocessorExec(ColumnAggregationProtocol.class,
+                         ROWS[rowSeperator1 - 1], ROWS[rowSeperator2 + 1],
+                         new Batch.Call<ColumnAggregationProtocol, Long>() {
+                           public Long call(ColumnAggregationProtocol instance)
+                               throws IOException {
+                             return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
+                           }
+                         });
     int sumResult = 0;
     int expectedResult = 0;
     for (Map.Entry<byte[], Long> e : results.entrySet()) {
       sumResult += e.getValue();
     }
-    for(int i = 0;i < ROWSIZE; i++) {
+    for (int i = 0; i < ROWSIZE; i++) {
       expectedResult += i;
     }
     assertEquals("Invalid result", sumResult, expectedResult);
@@ -108,29 +146,29 @@ public class TestCoprocessorEndpoint {
     results.clear();
 
     // scan: for region 2 and region 3
-    results = table.coprocessorExec(ColumnAggregationProtocol.class,
-        ROWS[rowSeperator1 + 1],
-        ROWS[rowSeperator2 + 1],
-        new Batch.Call<ColumnAggregationProtocol,Long>() {
-          public Long call(ColumnAggregationProtocol instance)
-          throws IOException{
-            return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
-          }
-        });
+    results = table
+        .coprocessorExec(ColumnAggregationProtocol.class,
+                         ROWS[rowSeperator1 + 1], ROWS[rowSeperator2 + 1],
+                         new Batch.Call<ColumnAggregationProtocol, Long>() {
+                           public Long call(ColumnAggregationProtocol instance)
+                               throws IOException {
+                             return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
+                           }
+                         });
     sumResult = 0;
     expectedResult = 0;
     for (Map.Entry<byte[], Long> e : results.entrySet()) {
       sumResult += e.getValue();
     }
-    for(int i = rowSeperator1;i < ROWSIZE; i++) {
+    for (int i = rowSeperator1; i < ROWSIZE; i++) {
       expectedResult += i;
     }
     assertEquals("Invalid result", sumResult, expectedResult);
   }
 
-  private static byte [][] makeN(byte [] base, int n) {
-    byte [][] ret = new byte[n][];
-    for(int i=0;i<n;i++) {
+  private static byte[][] makeN(byte[] base, int n) {
+    byte[][] ret = new byte[n][];
+    for (int i = 0; i < n; i++) {
       ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
     }
     return ret;