You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/17 02:49:17 UTC

svn commit: r1588118 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/coprocessor/endpoints/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/ipc/thrift/ main/java/org...

Author: liyin
Date: Thu Apr 17 00:49:17 2014
New Revision: 1588118

URL: http://svn.apache.org/r1588118
Log:
[master] Support arbitrary parameters and return value in endpoint.

Author: daviddeng

Summary: Implements `EndpointBytesCodec` converting between primitive types and `byte arrays.

Test Plan:
`TestEndpoint` was changed.
`TestEndpointBytesCodec` was added.

Reviewers: adela, gauravm, manukranthk

Reviewed By: adela

CC: hbase-eng@, andrewcox

Differential Revision: https://phabricator.fb.com/D1265851

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointBytesCodec.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/UnsupportedTypeException.java
      - copied, changed from r1588117, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointBytesCodec.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointClient.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Thu Apr 17 00:49:17 2014
@@ -1528,8 +1528,8 @@ getRegionCachePrefetch(new StringBytes(t
   }
 
   @Override
-  public <T extends IEndpoint> Map<byte[], byte[]> coprocessorEndpoint(
-      Class<T> clazz, byte[] startRow, byte[] stopRow, Caller<T> caller)
+  public <T extends IEndpoint, R> Map<HRegionInfo, R> coprocessorEndpoint(
+      Class<T> clazz, byte[] startRow, byte[] stopRow, Caller<T, R> caller)
       throws IOException {
     return this.endpointClient.coprocessorEndpoint(clazz, startRow, stopRow,
         caller);

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointBytesCodec.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointBytesCodec.java?rev=1588118&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointBytesCodec.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointBytesCodec.java Thu Apr 17 00:49:17 2014
@@ -0,0 +1,258 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.endpoints;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Codec for endpoint that convert between types and byte arrays.
+ *
+ * TODO add testcase for this class.
+ */
+public class EndpointBytesCodec {
+
+  /**
+   * The interface of decoder.
+   */
+  public interface IBytesDecoder {
+    /**
+     * Decodes a byte array into an object.
+     */
+    Object decode(byte[] bytes);
+  }
+
+  /**
+   * The interface of encoder.
+   */
+  public interface IBytesEncoder {
+    /**
+     * Encode an object into bytes.
+     */
+    public byte[] encode(Object obj);
+  }
+
+  /**
+   * Mapping from class types to IBytesDecoders.
+   */
+  private static Map<Class<?>, IBytesDecoder> decoders = new HashMap<>();
+
+  /**
+   * Mapping from class types to IBytesEncoders.
+   */
+  private static Map<Class<?>, IBytesEncoder> encoders = new HashMap<>();
+
+  /**
+   * Connects an encoder and a decoder with some classes.
+   *
+   * @param classes All classes in this array will be connected to the codecs.
+   */
+  private static void addCodec(Class<?>[] classes, IBytesEncoder enc,
+      IBytesDecoder dec) {
+    for (Class<?> cls : classes) {
+      encoders.put(cls, enc);
+      decoders.put(cls, dec);
+    }
+  }
+
+  private static final byte[] BYTES_FALSE = { 0 };
+  private static final byte[] BYTES_TRUE = { 1 };
+
+  static {
+    addCodec(new Class<?>[] { byte[].class }, new IBytesEncoder() {
+      @Override
+      public byte[] encode(Object obj) {
+        return (byte[]) obj;
+      }
+    }, new IBytesDecoder() {
+      @Override
+      public Object decode(byte[] param) {
+        if (param == null) {
+          return HConstants.EMPTY_BYTE_ARRAY;
+        }
+        return param;
+      }
+    });
+    addCodec(new Class<?>[] { String.class }, new IBytesEncoder() {
+      @Override
+      public byte[] encode(Object obj) {
+        return Bytes.toBytes((String) obj);
+      }
+    }, new IBytesDecoder() {
+      @Override
+      public Object decode(byte[] param) {
+        return Bytes.toString(param);
+      }
+    });
+    addCodec(new Class<?>[] { Boolean.class, boolean.class },
+        new IBytesEncoder() {
+          @Override
+          public byte[] encode(Object obj) {
+            return ((Boolean) obj) ? BYTES_TRUE : BYTES_FALSE;
+          }
+        }, new IBytesDecoder() {
+          @Override
+          public Object decode(byte[] param) {
+            return param[0] != 0;
+          }
+        });
+    addCodec(new Class<?>[] { Byte.class, byte.class },
+        new IBytesEncoder() {
+          @Override
+          public byte[] encode(Object obj) {
+            return new byte[]{(Byte) obj};
+          }
+        }, new IBytesDecoder() {
+          @Override
+          public Object decode(byte[] param) {
+            return param[0];
+          }
+        });
+    addCodec(new Class<?>[] { Character.class, char.class },
+        new IBytesEncoder() {
+          @Override
+          public byte[] encode(Object obj) {
+            return Bytes.toBytes((short) ((Character) obj).charValue());
+          }
+        }, new IBytesDecoder() {
+          @Override
+          public Object decode(byte[] param) {
+            return (char) Bytes.toShort(param);
+          }
+        });
+    addCodec(new Class<?>[] { Short.class, short.class },
+        new IBytesEncoder() {
+          @Override
+          public byte[] encode(Object obj) {
+            return Bytes.toBytes((Short) obj);
+          }
+        }, new IBytesDecoder() {
+          @Override
+          public Object decode(byte[] param) {
+            return Bytes.toShort(param);
+          }
+        });
+    addCodec(new Class<?>[] { Integer.class, int.class },
+        new IBytesEncoder() {
+          @Override
+          public byte[] encode(Object obj) {
+            return Bytes.toBytes((Integer) obj);
+          }
+        }, new IBytesDecoder() {
+          @Override
+          public Object decode(byte[] param) {
+            return Bytes.toInt(param);
+          }
+        });
+    addCodec(new Class<?>[] { Long.class, long.class },
+        new IBytesEncoder() {
+          @Override
+          public byte[] encode(Object obj) {
+            return Bytes.toBytes((Long) obj);
+          }
+        }, new IBytesDecoder() {
+          @Override
+          public Object decode(byte[] param) {
+            return Bytes.toLong(param);
+          }
+        });
+    addCodec(new Class<?>[] { Float.class, float.class },
+        new IBytesEncoder() {
+          @Override
+          public byte[] encode(Object obj) {
+            return Bytes.toBytes((Float) obj);
+          }
+        }, new IBytesDecoder() {
+          @Override
+          public Object decode(byte[] param) {
+            return Bytes.toFloat(param);
+          }
+        });
+    addCodec(new Class<?>[] { Double.class, double.class },
+        new IBytesEncoder() {
+          @Override
+          public byte[] encode(Object obj) {
+            return Bytes.toBytes((Double) obj);
+          }
+        }, new IBytesDecoder() {
+          @Override
+          public Object decode(byte[] param) {
+            return Bytes.toDouble(param);
+          }
+        });
+  }
+
+  /**
+   * @returns an IBytesDecoder for a specified type. null is returned if not
+   *          supported.
+   */
+  public static IBytesDecoder findDecoder(Class<?> type) {
+    // TODO daviddeng support array of supported types.
+    return decoders.get(type);
+  }
+
+  /**
+   * @return an IBytesEncoder for a specified type. null is returned if not
+   *         supported
+   */
+  public static IBytesEncoder findEncoder(Class<?> type) {
+    // TODO daviddeng support array of supported types.
+    return encoders.get(type);
+  }
+
+  /**
+   * Decodes a byte array into an object with specified type.
+   */
+  public static Object decode(Class<?> type, byte[] bytes) {
+    return decoders.get(type).decode(bytes);
+  }
+
+  /**
+   * Encodes an Object into a byte array.
+   */
+  public static byte[] encodeObject(Object obj) {
+    if (obj == null) {
+      // We don't distinguish null and zero-length byte array
+      return HConstants.EMPTY_BYTE_ARRAY;
+    }
+
+    IBytesEncoder enc = findEncoder(obj.getClass());
+    if (enc == null) {
+      new UnsupportedTypeException(obj.getClass());
+    }
+    return enc.encode(obj);
+  }
+
+  /**
+   * Encodes an array of Objects into an ArrayList of byte arrays.
+   */
+  public static ArrayList<byte[]> encodeArray(Object[] args) {
+    ArrayList<byte[]> res = new ArrayList<>(args.length);
+    for (int i = 0; i < args.length; i++) {
+      res.add(encodeObject(args[i]));
+    }
+    return res;
+  }
+
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java Thu Apr 17 00:49:17 2014
@@ -19,8 +19,18 @@
  */
 package org.apache.hadoop.hbase.coprocessor.endpoints;
 
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointBytesCodec.IBytesDecoder;
+
 /**
  * The manager holding all endpoint factories in the server.
  *
@@ -29,27 +39,127 @@ public class EndpointManager {
   private static EndpointManager instance = new EndpointManager();
 
   /**
+   * Make constructor private for singleton mode.
+   */
+  private EndpointManager() {
+  }
+
+  /**
    * Returns the singleton endpoint-manager
    */
   public static EndpointManager get() {
     return instance;
   }
 
-  private ConcurrentHashMap<String, IEndpointFactory<?>> nameFacts = new ConcurrentHashMap<>();
+  /**
+   * Data-structure storing information of an Endpoint.
+   */
+  public static class EndpointInfo {
+    private IEndpointFactory<?> factory;
+    private Map<String, Method> methods = new HashMap<>();
+    private Map<String, IBytesDecoder[]> mthToDecs = new ConcurrentHashMap<>();
+
+    private Object[] encodeParams(String methodKey, ArrayList<byte[]> params) {
+      IBytesDecoder[] decoders = mthToDecs.get(methodKey);
+      Object[] res = new Object[params.size()];
+      for (int i = 0; i < res.length; i++) {
+        res[i] = decoders[i].decode(params.get(i));
+      }
+      return res;
+    }
+
+    private static String makeMethodKey(String methodName, int nParams) {
+      return methodName + "/" + nParams;
+    }
+
+    private static final HashSet<String> IEndpointMethodNames = new HashSet<>();
+    static {
+      for (Method method : IEndpoint.class.getMethods()) {
+        IEndpointMethodNames.add(method.getName());
+      }
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param iEndpoint the class of the IEndpont instance.
+     * @param factory the factory generating IEndpoint instances.
+     */
+    public EndpointInfo(Class<?> iEndpoint, IEndpointFactory<?> factory) {
+      this.factory = factory;
+
+      for (Method method : iEndpoint.getMethods()) {
+        if (IEndpointMethodNames.contains(method.getName())) {
+          // Ignore methods in IEndpoint
+          continue;
+        }
+
+        Class<?>[] paramsCls = method.getParameterTypes();
+
+        String key = makeMethodKey(method.getName(), paramsCls.length);
+        this.methods.put(key, method);
+
+        EndpointBytesCodec.IBytesDecoder[] decs =
+            new EndpointBytesCodec.IBytesDecoder[paramsCls.length];
+        for (int i = 0; i < paramsCls.length; i++) {
+          IBytesDecoder dec = EndpointBytesCodec.findDecoder(paramsCls[i]);
+          if (dec == null) {
+            throw new UnsupportedTypeException(paramsCls[i]);
+          }
+          decs[i] = dec;
+        }
+        this.mthToDecs.put(key, decs);
+      }
+    }
+
+    /**
+     * Calls factory to create a new instance of IEndpoint.
+     */
+    public IEndpoint createEndpoint() {
+      return factory.create();
+    }
+
+    /**
+     * Invokes a method in the Endpoint.
+     *
+     * @param ep the IEndpoint instance.
+     * @param methodName the name of the methods.
+     * @param params the encoded parameters.
+     * @return the encoded return results.
+     */
+    public byte[] invoke(IEndpoint ep, String methodName, ArrayList<byte[]> params)
+        throws IllegalAccessException, IllegalArgumentException,
+        InvocationTargetException, IOException {
+      String methodKey = EndpointInfo.makeMethodKey(methodName, params.size());
+      Method mth = methods.get(methodKey);
+      if (mth == null) {
+        // TODO daviddeng make a special exception for this
+        throw new DoNotRetryIOException("epName." + methodKey
+            + " does not exists");
+      }
+      return EndpointBytesCodec.encodeObject(mth.invoke(ep,
+          encodeParams(methodKey, params)));
+    }
+  }
+
+  private ConcurrentHashMap<String, EndpointInfo> nameFacts =
+      new ConcurrentHashMap<>();
 
   /**
    * Returns the factory of an endpoint.
    */
-  public IEndpointFactory<?> getFactory(String name) {
+  public EndpointInfo getEndpointEntry(String name) {
     return nameFacts.get(name);
-
   }
 
   /**
-   * Register an endpoint with its factory
+   * Registers an endpoint with its factory
+   *
+   * @throws UnsupportedTypeException if one of the parameter's type is not
+   *           supported.
    */
   public <T extends IEndpoint> void register(Class<T> iEndpoint,
       IEndpointFactory<T> factory) {
-    nameFacts.put(iEndpoint.getName(), factory);
+    nameFacts.put(iEndpoint.getName(), new EndpointInfo(iEndpoint, factory));
   }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java Thu Apr 17 00:49:17 2014
@@ -19,18 +19,19 @@
  */
 package org.apache.hadoop.hbase.coprocessor.endpoints;
 
-import java.lang.reflect.Method;
+import java.util.ArrayList;
 
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointManager.EndpointInfo;
 import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 
 /**
- * A endpoint server.
+ * An endpoint server.
  */
-public class EndpointServer {
+public class EndpointServer implements IEndpointServer {
 
   private HRegionServer server;
 
@@ -38,35 +39,22 @@ public class EndpointServer {
     this.server = server;
   }
 
-  /**
-   * Calls an endpoint on an region server.
-   *
-   * TODO make regionName a list.
-   *
-   * @param epName
-   *          the endpoint name.
-   * @param methodName
-   *          the method name.
-   * @param regionName
-   *          the name of the region
-   * @param startRow
-   *          the start row, inclusive
-   * @param stopRow
-   *          the stop row, exclusive
-   * @return the computed value.
-   */
+  @Override
   public byte[] callEndpoint(String epName, String methodName,
-      final byte[] regionName, final byte[] startRow, final byte[] stopRow)
-      throws ThriftHBaseException {
+      ArrayList<byte[]> params, final byte[] regionName, final byte[] startRow,
+      final byte[] stopRow) throws ThriftHBaseException {
     try {
-      IEndpointFactory<?> fact = EndpointManager.get().getFactory(epName);
-      if (fact == null) {
+      EndpointInfo ent = EndpointManager.get().getEndpointEntry(epName);
+      if (ent == null) {
         // TODO daviddeng make a special exception for this
         throw new DoNotRetryIOException("Endpoint " + epName
             + " does not exists");
       }
-      IEndpoint ep = fact.create();
 
+      // Create an IEndpoint instance.
+      IEndpoint ep = ent.createEndpoint();
+
+      // Set the context.
       ep.setContext(new IEndpointContext() {
         @Override
         public HRegion getRegion() throws NotServingRegionException {
@@ -84,12 +72,10 @@ public class EndpointServer {
         }
       });
 
-      // TODO daviddeng: now we only support methods without any parameters.
-      Method mth = ep.getClass().getMethod(methodName);
-      return (byte[]) mth.invoke(ep);
+      // Invoke the specified method with parameters, the return value is
+      // encoded and returned.
+      return ent.invoke(ep, methodName, params);
     } catch (Exception e) {
-      // TODO daviddeng if the method is not found, should throw
-      // DoNotRetryIOException
       throw new ThriftHBaseException(e);
     }
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java Thu Apr 17 00:49:17 2014
@@ -23,9 +23,9 @@ import java.io.IOException;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.HServerAd
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.ServerCallable;
-import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * A IEndpointClient served as part of an HTable.
@@ -63,17 +62,20 @@ public class HTableEndpointClient implem
 
     InvocationHandler handler = new InvocationHandler() {
       @Override
-      public Object invoke(Object proxy, final Method method, Object[] args)
-          throws Throwable {
+      public Object invoke(Object proxy, final Method method,
+          final Object[] args) throws Throwable {
         HConnection conn = table.getConnectionAndResetOperationContext();
-        return conn.getRegionServerWithRetries(new ServerCallable<byte[]>(
+        return conn.getRegionServerWithRetries(new ServerCallable<Object>(
             table.getConnection(), table.getTableNameStringBytes(),
             region.getStartKey(), table.getOptions()) {
           @Override
-          public byte[] call() throws IOException {
-            // TODO support arguments
-            return server.callEndpoint(clazz.getName(), method.getName(),
-                region.getRegionName(), startRow, stopRow);
+          public Object call() throws IOException {
+            byte[] res = server.callEndpoint(clazz.getName(),method.getName(),
+                EndpointBytesCodec.encodeArray(args), region.getRegionName(),
+                startRow, stopRow);
+
+            return EndpointBytesCodec.decode(method.getReturnType(),
+                res);
           }
         });
       }
@@ -84,10 +86,10 @@ public class HTableEndpointClient implem
   }
 
   @Override
-  public <T extends IEndpoint> Map<byte[], byte[]> coprocessorEndpoint(
-      Class<T> clazz, byte[] startRow, byte[] stopRow, Caller<T> caller)
+  public <T extends IEndpoint, R> Map<HRegionInfo, R> coprocessorEndpoint(
+      Class<T> clazz, byte[] startRow, byte[] stopRow, Caller<T, R> caller)
       throws IOException {
-    Map<byte[], byte[]> results = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    Map<HRegionInfo, R> results = new HashMap<>();
 
     NavigableMap<HRegionInfo, HServerAddress> regions = table.getRegionsInfo();
 
@@ -95,7 +97,7 @@ public class HTableEndpointClient implem
       // TODO compute startRow and stopRow
       T ep = getEndpointProxy(clazz, region, HConstants.EMPTY_BYTE_ARRAY,
           HConstants.EMPTY_BYTE_ARRAY);
-      results.put(region.getRegionName(), caller.call(ep));
+      results.put(region, caller.call(ep));
     }
 
     return results;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointClient.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointClient.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointClient.java Thu Apr 17 00:49:17 2014
@@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.coproces
 import java.io.IOException;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.HRegionInfo;
+
 /**
  * The interface of a client for calling a endpoint.
  */
@@ -30,10 +32,10 @@ public interface IEndpointClient {
   /**
    * The interface of a caller for <code>coprocessorEndpoint</code>
    *
-   * @param <T>
-   *          The type of the endpoint interface. (NOT the implementation)
+   * @param <T> the type of the endpoint interface. (NOT the implementation)
+   * @param <R> the type of the return value.
    */
-  public interface Caller<T extends IEndpoint> {
+  public interface Caller<T extends IEndpoint, R> {
 
     /**
      * Calls an endpoint.
@@ -42,7 +44,7 @@ public interface IEndpointClient {
      *          an RPC client.
      * @return the result to be put as a value in coprocessorEndpoint's results
      */
-    byte[] call(T client) throws IOException;
+    R call(T client) throws IOException;
   }
 
   /**
@@ -61,6 +63,7 @@ public interface IEndpointClient {
    *          the caller for each region
    * @return a map from region name to results.
    */
-  <T extends IEndpoint> Map<byte[], byte[]> coprocessorEndpoint(Class<T> clazz,
-      byte[] startRow, byte[] stopRow, Caller<T> caller) throws IOException;
+  <T extends IEndpoint, R> Map<HRegionInfo, R> coprocessorEndpoint(
+      Class<T> clazz, byte[] startRow, byte[] stopRow, Caller<T, R> caller)
+      throws IOException;
 }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointServer.java?rev=1588118&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointServer.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointServer.java Thu Apr 17 00:49:17 2014
@@ -0,0 +1,55 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.endpoints;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
+
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.service.ThriftException;
+import com.facebook.swift.service.ThriftMethod;
+
+/**
+ * The interface of a server executing endpoints.
+ */
+public interface IEndpointServer {
+  /**
+   * Calls an endpoint on an region server.
+   *
+   * TODO make regionName/startRow/stopRow a list.
+   *
+   * @param epName the endpoint name.
+   * @param methodName the method name.
+   * @param regionName the name of the region
+   * @param startRow the start row, inclusive
+   * @param stopRow the stop row, exclusive
+   * @return the computed value.
+   */
+  @ThriftMethod(value = "callEndpoint", exception = {
+      @ThriftException(type = ThriftHBaseException.class, id = 1) })
+  public byte[] callEndpoint(@ThriftField(name = "epName") String epName,
+      @ThriftField(name = "methodName") String methodName,
+      @ThriftField(name = "params") ArrayList<byte[]> params,
+      @ThriftField(name = "regionName") byte[] regionName,
+      @ThriftField(name = "startRow") byte[] startRow,
+      @ThriftField(name = "stopRow") byte[] stopRow)
+      throws ThriftHBaseException;
+}

Copied: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/UnsupportedTypeException.java (from r1588117, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java)
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/UnsupportedTypeException.java?p2=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/UnsupportedTypeException.java&p1=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java&r1=1588117&r2=1588118&rev=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/UnsupportedTypeException.java Thu Apr 17 00:49:17 2014
@@ -19,37 +19,19 @@
  */
 package org.apache.hadoop.hbase.coprocessor.endpoints;
 
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
- * The manager holding all endpoint factories in the server.
- *
+ * An exception thrown when the type of a parameter or return value of a method
+ * is not supported.
  */
-public class EndpointManager {
-  private static EndpointManager instance = new EndpointManager();
-
-  /**
-   * Returns the singleton endpoint-manager
-   */
-  public static EndpointManager get() {
-    return instance;
-  }
-
-  private ConcurrentHashMap<String, IEndpointFactory<?>> nameFacts = new ConcurrentHashMap<>();
-
-  /**
-   * Returns the factory of an endpoint.
-   */
-  public IEndpointFactory<?> getFactory(String name) {
-    return nameFacts.get(name);
-
-  }
+public class UnsupportedTypeException extends RuntimeException {
+  private static final long serialVersionUID = 1L;
 
   /**
-   * Register an endpoint with its factory
+   * Constructor.
+   *
+   * @param cls the Class of the unsupported type.
    */
-  public <T extends IEndpoint> void register(Class<T> iEndpoint,
-      IEndpointFactory<T> factory) {
-    nameFacts.put(iEndpoint.getName(), factory);
+  public UnsupportedTypeException(Class<?> cls) {
+    super(cls + " is not supported by endpoint codec.");
   }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Thu Apr 17 00:49:17 2014
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.ipc;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -65,7 +66,8 @@ public interface HRegionInterface extend
    * @return  the computed value.
    */
   public byte[] callEndpoint(String epName, String methodName,
-      byte[] regionName, byte[] startRow, byte[] stopRow) throws IOException;
+      ArrayList<byte[]> params, byte[] regionName, byte[] startRow,
+      byte[] stopRow) throws IOException;
 
   /**
    * Get metainfo about an HRegion

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java Thu Apr 17 00:49:17 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Ro
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TMultiResponse;
 import org.apache.hadoop.hbase.client.TRowMutations;
+import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointServer;
 import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket;
 import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
 import org.apache.hadoop.hbase.master.AssignmentPlan;
@@ -56,28 +57,8 @@ import com.google.common.util.concurrent
  *
  */
 @ThriftService
-public interface ThriftHRegionInterface extends ThriftClientInterface {
-
-  /**
-   * Calls an endpoint on an region server.
-   *
-   * TODO make regionName/startRow/stopRow a list.
-   *
-   * @param epName  the endpoint name.
-   * @param methodName  the method name.
-   * @param regionName  the name of the region
-   * @param startRow  the start row, inclusive
-   * @param stopRow  the stop row, exclusive
-   * @return  the computed value.
-   */
-  @ThriftMethod(value = "callEndpoint", exception = {
-      @ThriftException(type = ThriftHBaseException.class, id = 1) })
-  public byte[] callEndpoint(@ThriftField(name = "epName") String epName,
-      @ThriftField(name = "methodName") String methodName,
-      @ThriftField(name = "regionName") byte[] regionName,
-      @ThriftField(name = "startRow") byte[] startRow,
-      @ThriftField(name = "stopRow") byte[] stopRow)
-      throws ThriftHBaseException;
+public interface ThriftHRegionInterface extends ThriftClientInterface,
+    IEndpointServer {
 
   /**
    * Get metainfo about an HRegion

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java Thu Apr 17 00:49:17 2014
@@ -1189,11 +1189,12 @@ public class HBaseToThriftAdapter implem
 
   @Override
   public byte[] callEndpoint(String epName, String methodName,
-      byte[] regionName, byte[] startRow, byte[] stopRow) throws IOException {
+      ArrayList<byte[]> params, byte[] regionName, byte[] startRow,
+      byte[] stopRow) throws IOException {
     preProcess();
     try {
-      return connection.callEndpoint(epName, methodName, regionName, startRow,
-          stopRow);
+      return connection.callEndpoint(epName, methodName, params, regionName,
+          startRow, stopRow);
     } catch (ThriftHBaseException te) {
       Exception e = te.getServerJavaException();
       handleIOException(e);

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Apr 17 00:49:17 2014
@@ -4100,8 +4100,8 @@ public class HRegionServer implements HR
 
   @Override
   public byte[] callEndpoint(String epName, String methodName,
-      final byte[] regionName, final byte[] startRow, final byte[] stopRow)
-      throws IOException {
+      ArrayList<byte[]> params, final byte[] regionName, final byte[] startRow,
+      final byte[] stopRow) throws IOException {
     throw new NotImplementedException("HRegionserver.callEndpoint");
   }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java Thu Apr 17 00:49:17 2014
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.Sc
 import org.apache.hadoop.hbase.client.TMultiResponse;
 import org.apache.hadoop.hbase.client.TRowMutations;
 import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointServer;
+import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointServer;
 import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket;
 import org.apache.hadoop.hbase.ipc.ThriftHRegionInterface;
 import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
@@ -60,8 +61,6 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Writable;
 
-import com.facebook.swift.service.ThriftException;
-import com.facebook.swift.service.ThriftMethod;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 
@@ -70,10 +69,10 @@ import com.google.common.util.concurrent
  *
  */
 public class ThriftHRegionServer implements ThriftHRegionInterface {
-  public static Log LOG = LogFactory.getLog(ThriftHRegionServer.class);
+  private static Log LOG = LogFactory.getLog(ThriftHRegionServer.class);
 
   private HRegionServer server;
-  private EndpointServer endpointServer;
+  private IEndpointServer endpointServer;
 
   public ThriftHRegionServer(HRegionServer server) {
     this.server = server;
@@ -639,9 +638,9 @@ public class ThriftHRegionServer impleme
 
   @Override
   public byte[] callEndpoint(String epName, String methodName,
-      final byte[] regionName, final byte[] startRow, final byte[] stopRow)
-      throws ThriftHBaseException {
-    return endpointServer.callEndpoint(epName, methodName, regionName,
+      ArrayList<byte[]> params, final byte[] regionName, final byte[] startRow,
+      final byte[] stopRow) throws ThriftHBaseException {
+    return endpointServer.callEndpoint(epName, methodName, params, regionName,
         startRow, stopRow);
   }
 

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java Thu Apr 17 00:49:17 2014
@@ -23,20 +23,16 @@ import java.io.IOException;
 import java.util.Map;
 
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointLib;
 import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointLib.IAggregator;
-import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointManager;
-import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpoint;
-import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointClient;
 import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointClient.Caller;
-import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointContext;
-import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointFactory;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.StringBytes;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -47,12 +43,15 @@ import org.junit.Test;
  */
 public class TestEndpoint {
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-  private static final byte[] TABLE_NAME = Bytes.toBytes("cp");
+  private static final StringBytes TABLE_NAME = new StringBytes("cp");
   private static final byte[] FAMILY_NAME = Bytes.toBytes("f");
   private static final byte[] QUALITY_NAME = Bytes.toBytes("q");
 
   @Before
   public void setUp() throws Exception {
+    TEST_UTIL.getConfiguration().set(HBaseTestingUtility.FS_TYPE_KEY,
+        HBaseTestingUtility.FS_TYPE_LFS);
+
     TEST_UTIL.startMiniCluster();
     // Register an endpoint in the server side.
     EndpointManager.get().register(ISummer.class,
@@ -75,7 +74,7 @@ public class TestEndpoint {
    *
    */
   public static interface ISummer extends IEndpoint {
-    byte[] sum() throws IOException;
+    long sum(int offset) throws IOException;
   }
 
   /**
@@ -83,6 +82,7 @@ public class TestEndpoint {
    */
   public static class Summer implements ISummer, IAggregator {
     IEndpointContext context;
+    int offset;
     long result;
 
     @Override
@@ -91,21 +91,23 @@ public class TestEndpoint {
     }
 
     @Override
-    public byte[] sum() throws IOException {
+    public long sum(int offset) throws IOException {
       HRegion region = context.getRegion();
       Scan scan = new Scan();
       scan.addFamily(FAMILY_NAME);
       scan.addColumn(FAMILY_NAME, QUALITY_NAME);
 
+      this.offset = offset;
       this.result = 0L;
       EndpointLib.aggregateScan(region, scan, this);
-      return Bytes.toBytes(this.result);
+      return this.result;
     }
 
     @Override
     public void aggregate(KeyValue kv) {
-      this.result += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(),
+      long vl = Bytes.toLong(kv.getBuffer(), kv.getValueOffset(),
           kv.getValueLength());
+      this.result += vl + offset;
     }
   }
 
@@ -120,23 +122,42 @@ public class TestEndpoint {
           QUALITY_NAME, Bytes.toBytes((long) i)));
     }
 
-    // Calling endpoints.
     IEndpointClient cp = (IEndpointClient) table;
-    Map<byte[], byte[]> results = cp.coprocessorEndpoint(ISummer.class, null,
-        null, new Caller<ISummer>() {
+
+    // Calling endpoints with zero offsets.
+    Map<HRegionInfo, Long> results = cp.coprocessorEndpoint(ISummer.class, null,
+        null, new Caller<ISummer, Long>() {
           @Override
-          public byte[] call(ISummer client) throws IOException {
-            return client.sum();
+          public Long call(ISummer client) throws IOException {
+            return client.sum(0);
           }
         });
 
     // Aggregates results from all regions
     long sum = 0;
-    for (byte[] res : results.values()) {
-      sum += Bytes.toLong(res);
+    for (Long res : results.values()) {
+      sum += res;
     }
 
     // Check the final results
     Assert.assertEquals("sum", 55, sum);
+
+    // Calling endpoints with -1 offsets.
+    results = cp.coprocessorEndpoint(ISummer.class, null,
+        null, new Caller<ISummer, Long>() {
+          @Override
+          public Long call(ISummer client) throws IOException {
+            return client.sum(-1);
+          }
+        });
+
+    // Aggregates results from all regions
+    sum = 0;
+    for (Long res : results.values()) {
+      sum += res;
+    }
+
+    // Check the final results
+    Assert.assertEquals("sum", 45, sum);
   }
 }

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointBytesCodec.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointBytesCodec.java?rev=1588118&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointBytesCodec.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointBytesCodec.java Thu Apr 17 00:49:17 2014
@@ -0,0 +1,65 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.endpoints;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Testcases for EndpointBytesCodec.
+ */
+public class TestEndpointBytesCodec {
+  @Test
+  public void testBasic() throws Exception {
+    final Object[] VALUES = {
+        false, true,
+        (byte) 2,
+        (char) 3,
+        (short) 4,
+        (int) 5,
+        (long) 6,
+        (float) 7.,
+        (double) 8.,
+        Bytes.toBytes("9"),
+    };
+
+    ArrayList<byte[]> arrayBytes = EndpointBytesCodec.encodeArray(VALUES);
+    for (int i = 0; i < VALUES.length; i++) {
+      Object vl = VALUES[i];
+      byte[] bytes = EndpointBytesCodec.encodeObject(vl);
+
+      Assert.assertArrayEquals("element in encodedArray[" + i + "]", bytes,
+          arrayBytes.get(i));
+
+      Object decoded = EndpointBytesCodec.decode(vl.getClass(), bytes);
+      if (vl instanceof byte[]) {
+        Assert.assertArrayEquals(
+            "recoved value of " + Bytes.toString((byte[]) vl),
+            (byte[]) vl, (byte[]) decoded);
+      } else {
+        Assert.assertEquals("recoved value of " + vl, vl, decoded);
+      }
+    }
+
+  }
+}