You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/17 02:49:17 UTC
svn commit: r1588118 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/coprocessor/endpoints/
main/java/org/apache/hadoop/hbase/ipc/
main/java/org/apache/hadoop/hbase/ipc/thrift/ main/java/org...
Author: liyin
Date: Thu Apr 17 00:49:17 2014
New Revision: 1588118
URL: http://svn.apache.org/r1588118
Log:
[master] Support arbitrary parameters and return value in endpoint.
Author: daviddeng
Summary: Implements `EndpointBytesCodec` converting between primitive types and `byte arrays.
Test Plan:
`TestEndpoint` was changed.
`TestEndpointBytesCodec` was added.
Reviewers: adela, gauravm, manukranthk
Reviewed By: adela
CC: hbase-eng@, andrewcox
Differential Revision: https://phabricator.fb.com/D1265851
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointBytesCodec.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/UnsupportedTypeException.java
- copied, changed from r1588117, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointBytesCodec.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointClient.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Thu Apr 17 00:49:17 2014
@@ -1528,8 +1528,8 @@ getRegionCachePrefetch(new StringBytes(t
}
@Override
- public <T extends IEndpoint> Map<byte[], byte[]> coprocessorEndpoint(
- Class<T> clazz, byte[] startRow, byte[] stopRow, Caller<T> caller)
+ public <T extends IEndpoint, R> Map<HRegionInfo, R> coprocessorEndpoint(
+ Class<T> clazz, byte[] startRow, byte[] stopRow, Caller<T, R> caller)
throws IOException {
return this.endpointClient.coprocessorEndpoint(clazz, startRow, stopRow,
caller);
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointBytesCodec.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointBytesCodec.java?rev=1588118&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointBytesCodec.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointBytesCodec.java Thu Apr 17 00:49:17 2014
@@ -0,0 +1,258 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.endpoints;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Codec for endpoint that convert between types and byte arrays.
+ *
+ * TODO add testcase for this class.
+ */
+public class EndpointBytesCodec {
+
+ /**
+ * The interface of decoder.
+ */
+ public interface IBytesDecoder {
+ /**
+ * Decodes a byte array into an object.
+ */
+ Object decode(byte[] bytes);
+ }
+
+ /**
+ * The interface of encoder.
+ */
+ public interface IBytesEncoder {
+ /**
+ * Encode an object into bytes.
+ */
+ public byte[] encode(Object obj);
+ }
+
+ /**
+ * Mapping from class types to IBytesDecoders.
+ */
+ private static Map<Class<?>, IBytesDecoder> decoders = new HashMap<>();
+
+ /**
+ * Mapping from class types to IBytesEncoders.
+ */
+ private static Map<Class<?>, IBytesEncoder> encoders = new HashMap<>();
+
+ /**
+ * Connects an encoder and a decoder with some classes.
+ *
+ * @param classes All classes in this array will be connected to the codecs.
+ */
+ private static void addCodec(Class<?>[] classes, IBytesEncoder enc,
+ IBytesDecoder dec) {
+ for (Class<?> cls : classes) {
+ encoders.put(cls, enc);
+ decoders.put(cls, dec);
+ }
+ }
+
+ private static final byte[] BYTES_FALSE = { 0 };
+ private static final byte[] BYTES_TRUE = { 1 };
+
+ static {
+ addCodec(new Class<?>[] { byte[].class }, new IBytesEncoder() {
+ @Override
+ public byte[] encode(Object obj) {
+ return (byte[]) obj;
+ }
+ }, new IBytesDecoder() {
+ @Override
+ public Object decode(byte[] param) {
+ if (param == null) {
+ return HConstants.EMPTY_BYTE_ARRAY;
+ }
+ return param;
+ }
+ });
+ addCodec(new Class<?>[] { String.class }, new IBytesEncoder() {
+ @Override
+ public byte[] encode(Object obj) {
+ return Bytes.toBytes((String) obj);
+ }
+ }, new IBytesDecoder() {
+ @Override
+ public Object decode(byte[] param) {
+ return Bytes.toString(param);
+ }
+ });
+ addCodec(new Class<?>[] { Boolean.class, boolean.class },
+ new IBytesEncoder() {
+ @Override
+ public byte[] encode(Object obj) {
+ return ((Boolean) obj) ? BYTES_TRUE : BYTES_FALSE;
+ }
+ }, new IBytesDecoder() {
+ @Override
+ public Object decode(byte[] param) {
+ return param[0] != 0;
+ }
+ });
+ addCodec(new Class<?>[] { Byte.class, byte.class },
+ new IBytesEncoder() {
+ @Override
+ public byte[] encode(Object obj) {
+ return new byte[]{(Byte) obj};
+ }
+ }, new IBytesDecoder() {
+ @Override
+ public Object decode(byte[] param) {
+ return param[0];
+ }
+ });
+ addCodec(new Class<?>[] { Character.class, char.class },
+ new IBytesEncoder() {
+ @Override
+ public byte[] encode(Object obj) {
+ return Bytes.toBytes((short) ((Character) obj).charValue());
+ }
+ }, new IBytesDecoder() {
+ @Override
+ public Object decode(byte[] param) {
+ return (char) Bytes.toShort(param);
+ }
+ });
+ addCodec(new Class<?>[] { Short.class, short.class },
+ new IBytesEncoder() {
+ @Override
+ public byte[] encode(Object obj) {
+ return Bytes.toBytes((Short) obj);
+ }
+ }, new IBytesDecoder() {
+ @Override
+ public Object decode(byte[] param) {
+ return Bytes.toShort(param);
+ }
+ });
+ addCodec(new Class<?>[] { Integer.class, int.class },
+ new IBytesEncoder() {
+ @Override
+ public byte[] encode(Object obj) {
+ return Bytes.toBytes((Integer) obj);
+ }
+ }, new IBytesDecoder() {
+ @Override
+ public Object decode(byte[] param) {
+ return Bytes.toInt(param);
+ }
+ });
+ addCodec(new Class<?>[] { Long.class, long.class },
+ new IBytesEncoder() {
+ @Override
+ public byte[] encode(Object obj) {
+ return Bytes.toBytes((Long) obj);
+ }
+ }, new IBytesDecoder() {
+ @Override
+ public Object decode(byte[] param) {
+ return Bytes.toLong(param);
+ }
+ });
+ addCodec(new Class<?>[] { Float.class, float.class },
+ new IBytesEncoder() {
+ @Override
+ public byte[] encode(Object obj) {
+ return Bytes.toBytes((Float) obj);
+ }
+ }, new IBytesDecoder() {
+ @Override
+ public Object decode(byte[] param) {
+ return Bytes.toFloat(param);
+ }
+ });
+ addCodec(new Class<?>[] { Double.class, double.class },
+ new IBytesEncoder() {
+ @Override
+ public byte[] encode(Object obj) {
+ return Bytes.toBytes((Double) obj);
+ }
+ }, new IBytesDecoder() {
+ @Override
+ public Object decode(byte[] param) {
+ return Bytes.toDouble(param);
+ }
+ });
+ }
+
+ /**
+ * @returns an IBytesDecoder for a specified type. null is returned if not
+ * supported.
+ */
+ public static IBytesDecoder findDecoder(Class<?> type) {
+ // TODO daviddeng support array of supported types.
+ return decoders.get(type);
+ }
+
+ /**
+ * @return an IBytesEncoder for a specified type. null is returned if not
+ * supported
+ */
+ public static IBytesEncoder findEncoder(Class<?> type) {
+ // TODO daviddeng support array of supported types.
+ return encoders.get(type);
+ }
+
+ /**
+ * Decodes a byte array into an object with specified type.
+ */
+ public static Object decode(Class<?> type, byte[] bytes) {
+ return decoders.get(type).decode(bytes);
+ }
+
+ /**
+ * Encodes an Object into a byte array.
+ */
+ public static byte[] encodeObject(Object obj) {
+ if (obj == null) {
+ // We don't distinguish null and zero-length byte array
+ return HConstants.EMPTY_BYTE_ARRAY;
+ }
+
+ IBytesEncoder enc = findEncoder(obj.getClass());
+ if (enc == null) {
+ new UnsupportedTypeException(obj.getClass());
+ }
+ return enc.encode(obj);
+ }
+
+ /**
+ * Encodes an array of Objects into an ArrayList of byte arrays.
+ */
+ public static ArrayList<byte[]> encodeArray(Object[] args) {
+ ArrayList<byte[]> res = new ArrayList<>(args.length);
+ for (int i = 0; i < args.length; i++) {
+ res.add(encodeObject(args[i]));
+ }
+ return res;
+ }
+
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java Thu Apr 17 00:49:17 2014
@@ -19,8 +19,18 @@
*/
package org.apache.hadoop.hbase.coprocessor.endpoints;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointBytesCodec.IBytesDecoder;
+
/**
* The manager holding all endpoint factories in the server.
*
@@ -29,27 +39,127 @@ public class EndpointManager {
private static EndpointManager instance = new EndpointManager();
/**
+ * Make constructor private for singleton mode.
+ */
+ private EndpointManager() {
+ }
+
+ /**
* Returns the singleton endpoint-manager
*/
public static EndpointManager get() {
return instance;
}
- private ConcurrentHashMap<String, IEndpointFactory<?>> nameFacts = new ConcurrentHashMap<>();
+ /**
+ * Data-structure storing information of an Endpoint.
+ */
+ public static class EndpointInfo {
+ private IEndpointFactory<?> factory;
+ private Map<String, Method> methods = new HashMap<>();
+ private Map<String, IBytesDecoder[]> mthToDecs = new ConcurrentHashMap<>();
+
+ private Object[] encodeParams(String methodKey, ArrayList<byte[]> params) {
+ IBytesDecoder[] decoders = mthToDecs.get(methodKey);
+ Object[] res = new Object[params.size()];
+ for (int i = 0; i < res.length; i++) {
+ res[i] = decoders[i].decode(params.get(i));
+ }
+ return res;
+ }
+
+ private static String makeMethodKey(String methodName, int nParams) {
+ return methodName + "/" + nParams;
+ }
+
+ private static final HashSet<String> IEndpointMethodNames = new HashSet<>();
+ static {
+ for (Method method : IEndpoint.class.getMethods()) {
+ IEndpointMethodNames.add(method.getName());
+ }
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param iEndpoint the class of the IEndpont instance.
+ * @param factory the factory generating IEndpoint instances.
+ */
+ public EndpointInfo(Class<?> iEndpoint, IEndpointFactory<?> factory) {
+ this.factory = factory;
+
+ for (Method method : iEndpoint.getMethods()) {
+ if (IEndpointMethodNames.contains(method.getName())) {
+ // Ignore methods in IEndpoint
+ continue;
+ }
+
+ Class<?>[] paramsCls = method.getParameterTypes();
+
+ String key = makeMethodKey(method.getName(), paramsCls.length);
+ this.methods.put(key, method);
+
+ EndpointBytesCodec.IBytesDecoder[] decs =
+ new EndpointBytesCodec.IBytesDecoder[paramsCls.length];
+ for (int i = 0; i < paramsCls.length; i++) {
+ IBytesDecoder dec = EndpointBytesCodec.findDecoder(paramsCls[i]);
+ if (dec == null) {
+ throw new UnsupportedTypeException(paramsCls[i]);
+ }
+ decs[i] = dec;
+ }
+ this.mthToDecs.put(key, decs);
+ }
+ }
+
+ /**
+ * Calls factory to create a new instance of IEndpoint.
+ */
+ public IEndpoint createEndpoint() {
+ return factory.create();
+ }
+
+ /**
+ * Invokes a method in the Endpoint.
+ *
+ * @param ep the IEndpoint instance.
+ * @param methodName the name of the methods.
+ * @param params the encoded parameters.
+ * @return the encoded return results.
+ */
+ public byte[] invoke(IEndpoint ep, String methodName, ArrayList<byte[]> params)
+ throws IllegalAccessException, IllegalArgumentException,
+ InvocationTargetException, IOException {
+ String methodKey = EndpointInfo.makeMethodKey(methodName, params.size());
+ Method mth = methods.get(methodKey);
+ if (mth == null) {
+ // TODO daviddeng make a special exception for this
+ throw new DoNotRetryIOException("epName." + methodKey
+ + " does not exists");
+ }
+ return EndpointBytesCodec.encodeObject(mth.invoke(ep,
+ encodeParams(methodKey, params)));
+ }
+ }
+
+ private ConcurrentHashMap<String, EndpointInfo> nameFacts =
+ new ConcurrentHashMap<>();
/**
* Returns the factory of an endpoint.
*/
- public IEndpointFactory<?> getFactory(String name) {
+ public EndpointInfo getEndpointEntry(String name) {
return nameFacts.get(name);
-
}
/**
- * Register an endpoint with its factory
+ * Registers an endpoint with its factory
+ *
+ * @throws UnsupportedTypeException if one of the parameter's type is not
+ * supported.
*/
public <T extends IEndpoint> void register(Class<T> iEndpoint,
IEndpointFactory<T> factory) {
- nameFacts.put(iEndpoint.getName(), factory);
+ nameFacts.put(iEndpoint.getName(), new EndpointInfo(iEndpoint, factory));
}
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java Thu Apr 17 00:49:17 2014
@@ -19,18 +19,19 @@
*/
package org.apache.hadoop.hbase.coprocessor.endpoints;
-import java.lang.reflect.Method;
+import java.util.ArrayList;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointManager.EndpointInfo;
import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
/**
- * A endpoint server.
+ * An endpoint server.
*/
-public class EndpointServer {
+public class EndpointServer implements IEndpointServer {
private HRegionServer server;
@@ -38,35 +39,22 @@ public class EndpointServer {
this.server = server;
}
- /**
- * Calls an endpoint on an region server.
- *
- * TODO make regionName a list.
- *
- * @param epName
- * the endpoint name.
- * @param methodName
- * the method name.
- * @param regionName
- * the name of the region
- * @param startRow
- * the start row, inclusive
- * @param stopRow
- * the stop row, exclusive
- * @return the computed value.
- */
+ @Override
public byte[] callEndpoint(String epName, String methodName,
- final byte[] regionName, final byte[] startRow, final byte[] stopRow)
- throws ThriftHBaseException {
+ ArrayList<byte[]> params, final byte[] regionName, final byte[] startRow,
+ final byte[] stopRow) throws ThriftHBaseException {
try {
- IEndpointFactory<?> fact = EndpointManager.get().getFactory(epName);
- if (fact == null) {
+ EndpointInfo ent = EndpointManager.get().getEndpointEntry(epName);
+ if (ent == null) {
// TODO daviddeng make a special exception for this
throw new DoNotRetryIOException("Endpoint " + epName
+ " does not exists");
}
- IEndpoint ep = fact.create();
+ // Create an IEndpoint instance.
+ IEndpoint ep = ent.createEndpoint();
+
+ // Set the context.
ep.setContext(new IEndpointContext() {
@Override
public HRegion getRegion() throws NotServingRegionException {
@@ -84,12 +72,10 @@ public class EndpointServer {
}
});
- // TODO daviddeng: now we only support methods without any parameters.
- Method mth = ep.getClass().getMethod(methodName);
- return (byte[]) mth.invoke(ep);
+ // Invoke the specified method with parameters, the return value is
+ // encoded and returned.
+ return ent.invoke(ep, methodName, params);
} catch (Exception e) {
- // TODO daviddeng if the method is not found, should throw
- // DoNotRetryIOException
throw new ThriftHBaseException(e);
}
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java Thu Apr 17 00:49:17 2014
@@ -23,9 +23,9 @@ import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
+import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.TreeMap;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.HServerAd
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.ServerCallable;
-import org.apache.hadoop.hbase.util.Bytes;
/**
* A IEndpointClient served as part of an HTable.
@@ -63,17 +62,20 @@ public class HTableEndpointClient implem
InvocationHandler handler = new InvocationHandler() {
@Override
- public Object invoke(Object proxy, final Method method, Object[] args)
- throws Throwable {
+ public Object invoke(Object proxy, final Method method,
+ final Object[] args) throws Throwable {
HConnection conn = table.getConnectionAndResetOperationContext();
- return conn.getRegionServerWithRetries(new ServerCallable<byte[]>(
+ return conn.getRegionServerWithRetries(new ServerCallable<Object>(
table.getConnection(), table.getTableNameStringBytes(),
region.getStartKey(), table.getOptions()) {
@Override
- public byte[] call() throws IOException {
- // TODO support arguments
- return server.callEndpoint(clazz.getName(), method.getName(),
- region.getRegionName(), startRow, stopRow);
+ public Object call() throws IOException {
+ byte[] res = server.callEndpoint(clazz.getName(),method.getName(),
+ EndpointBytesCodec.encodeArray(args), region.getRegionName(),
+ startRow, stopRow);
+
+ return EndpointBytesCodec.decode(method.getReturnType(),
+ res);
}
});
}
@@ -84,10 +86,10 @@ public class HTableEndpointClient implem
}
@Override
- public <T extends IEndpoint> Map<byte[], byte[]> coprocessorEndpoint(
- Class<T> clazz, byte[] startRow, byte[] stopRow, Caller<T> caller)
+ public <T extends IEndpoint, R> Map<HRegionInfo, R> coprocessorEndpoint(
+ Class<T> clazz, byte[] startRow, byte[] stopRow, Caller<T, R> caller)
throws IOException {
- Map<byte[], byte[]> results = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ Map<HRegionInfo, R> results = new HashMap<>();
NavigableMap<HRegionInfo, HServerAddress> regions = table.getRegionsInfo();
@@ -95,7 +97,7 @@ public class HTableEndpointClient implem
// TODO compute startRow and stopRow
T ep = getEndpointProxy(clazz, region, HConstants.EMPTY_BYTE_ARRAY,
HConstants.EMPTY_BYTE_ARRAY);
- results.put(region.getRegionName(), caller.call(ep));
+ results.put(region, caller.call(ep));
}
return results;
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointClient.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointClient.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointClient.java Thu Apr 17 00:49:17 2014
@@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.coproces
import java.io.IOException;
import java.util.Map;
+import org.apache.hadoop.hbase.HRegionInfo;
+
/**
* The interface of a client for calling a endpoint.
*/
@@ -30,10 +32,10 @@ public interface IEndpointClient {
/**
* The interface of a caller for <code>coprocessorEndpoint</code>
*
- * @param <T>
- * The type of the endpoint interface. (NOT the implementation)
+ * @param <T> the type of the endpoint interface. (NOT the implementation)
+ * @param <R> the type of the return value.
*/
- public interface Caller<T extends IEndpoint> {
+ public interface Caller<T extends IEndpoint, R> {
/**
* Calls an endpoint.
@@ -42,7 +44,7 @@ public interface IEndpointClient {
* an RPC client.
* @return the result to be put as a value in coprocessorEndpoint's results
*/
- byte[] call(T client) throws IOException;
+ R call(T client) throws IOException;
}
/**
@@ -61,6 +63,7 @@ public interface IEndpointClient {
* the caller for each region
* @return a map from region name to results.
*/
- <T extends IEndpoint> Map<byte[], byte[]> coprocessorEndpoint(Class<T> clazz,
- byte[] startRow, byte[] stopRow, Caller<T> caller) throws IOException;
+ <T extends IEndpoint, R> Map<HRegionInfo, R> coprocessorEndpoint(
+ Class<T> clazz, byte[] startRow, byte[] stopRow, Caller<T, R> caller)
+ throws IOException;
}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointServer.java?rev=1588118&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointServer.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointServer.java Thu Apr 17 00:49:17 2014
@@ -0,0 +1,55 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.endpoints;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
+
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.service.ThriftException;
+import com.facebook.swift.service.ThriftMethod;
+
+/**
+ * The interface of a server executing endpoints.
+ */
+public interface IEndpointServer {
+ /**
+ * Calls an endpoint on an region server.
+ *
+ * TODO make regionName/startRow/stopRow a list.
+ *
+ * @param epName the endpoint name.
+ * @param methodName the method name.
+ * @param regionName the name of the region
+ * @param startRow the start row, inclusive
+ * @param stopRow the stop row, exclusive
+ * @return the computed value.
+ */
+ @ThriftMethod(value = "callEndpoint", exception = {
+ @ThriftException(type = ThriftHBaseException.class, id = 1) })
+ public byte[] callEndpoint(@ThriftField(name = "epName") String epName,
+ @ThriftField(name = "methodName") String methodName,
+ @ThriftField(name = "params") ArrayList<byte[]> params,
+ @ThriftField(name = "regionName") byte[] regionName,
+ @ThriftField(name = "startRow") byte[] startRow,
+ @ThriftField(name = "stopRow") byte[] stopRow)
+ throws ThriftHBaseException;
+}
Copied: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/UnsupportedTypeException.java (from r1588117, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java)
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/UnsupportedTypeException.java?p2=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/UnsupportedTypeException.java&p1=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java&r1=1588117&r2=1588118&rev=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/UnsupportedTypeException.java Thu Apr 17 00:49:17 2014
@@ -19,37 +19,19 @@
*/
package org.apache.hadoop.hbase.coprocessor.endpoints;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
- * The manager holding all endpoint factories in the server.
- *
+ * An exception thrown when the type of a parameter or return value of a method
+ * is not supported.
*/
-public class EndpointManager {
- private static EndpointManager instance = new EndpointManager();
-
- /**
- * Returns the singleton endpoint-manager
- */
- public static EndpointManager get() {
- return instance;
- }
-
- private ConcurrentHashMap<String, IEndpointFactory<?>> nameFacts = new ConcurrentHashMap<>();
-
- /**
- * Returns the factory of an endpoint.
- */
- public IEndpointFactory<?> getFactory(String name) {
- return nameFacts.get(name);
-
- }
+public class UnsupportedTypeException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
/**
- * Register an endpoint with its factory
+ * Constructor.
+ *
+ * @param cls the Class of the unsupported type.
*/
- public <T extends IEndpoint> void register(Class<T> iEndpoint,
- IEndpointFactory<T> factory) {
- nameFacts.put(iEndpoint.getName(), factory);
+ public UnsupportedTypeException(Class<?> cls) {
+ super(cls + " is not supported by endpoint codec.");
}
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Thu Apr 17 00:49:17 2014
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -65,7 +66,8 @@ public interface HRegionInterface extend
* @return the computed value.
*/
public byte[] callEndpoint(String epName, String methodName,
- byte[] regionName, byte[] startRow, byte[] stopRow) throws IOException;
+ ArrayList<byte[]> params, byte[] regionName, byte[] startRow,
+ byte[] stopRow) throws IOException;
/**
* Get metainfo about an HRegion
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java Thu Apr 17 00:49:17 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Ro
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TMultiResponse;
import org.apache.hadoop.hbase.client.TRowMutations;
+import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointServer;
import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket;
import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
import org.apache.hadoop.hbase.master.AssignmentPlan;
@@ -56,28 +57,8 @@ import com.google.common.util.concurrent
*
*/
@ThriftService
-public interface ThriftHRegionInterface extends ThriftClientInterface {
-
- /**
- * Calls an endpoint on an region server.
- *
- * TODO make regionName/startRow/stopRow a list.
- *
- * @param epName the endpoint name.
- * @param methodName the method name.
- * @param regionName the name of the region
- * @param startRow the start row, inclusive
- * @param stopRow the stop row, exclusive
- * @return the computed value.
- */
- @ThriftMethod(value = "callEndpoint", exception = {
- @ThriftException(type = ThriftHBaseException.class, id = 1) })
- public byte[] callEndpoint(@ThriftField(name = "epName") String epName,
- @ThriftField(name = "methodName") String methodName,
- @ThriftField(name = "regionName") byte[] regionName,
- @ThriftField(name = "startRow") byte[] startRow,
- @ThriftField(name = "stopRow") byte[] stopRow)
- throws ThriftHBaseException;
+public interface ThriftHRegionInterface extends ThriftClientInterface,
+ IEndpointServer {
/**
* Get metainfo about an HRegion
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java Thu Apr 17 00:49:17 2014
@@ -1189,11 +1189,12 @@ public class HBaseToThriftAdapter implem
@Override
public byte[] callEndpoint(String epName, String methodName,
- byte[] regionName, byte[] startRow, byte[] stopRow) throws IOException {
+ ArrayList<byte[]> params, byte[] regionName, byte[] startRow,
+ byte[] stopRow) throws IOException {
preProcess();
try {
- return connection.callEndpoint(epName, methodName, regionName, startRow,
- stopRow);
+ return connection.callEndpoint(epName, methodName, params, regionName,
+ startRow, stopRow);
} catch (ThriftHBaseException te) {
Exception e = te.getServerJavaException();
handleIOException(e);
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Apr 17 00:49:17 2014
@@ -4100,8 +4100,8 @@ public class HRegionServer implements HR
@Override
public byte[] callEndpoint(String epName, String methodName,
- final byte[] regionName, final byte[] startRow, final byte[] stopRow)
- throws IOException {
+ ArrayList<byte[]> params, final byte[] regionName, final byte[] startRow,
+ final byte[] stopRow) throws IOException {
throw new NotImplementedException("HRegionserver.callEndpoint");
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java Thu Apr 17 00:49:17 2014
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.Sc
import org.apache.hadoop.hbase.client.TMultiResponse;
import org.apache.hadoop.hbase.client.TRowMutations;
import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointServer;
+import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointServer;
import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket;
import org.apache.hadoop.hbase.ipc.ThriftHRegionInterface;
import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
@@ -60,8 +61,6 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
-import com.facebook.swift.service.ThriftException;
-import com.facebook.swift.service.ThriftMethod;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -70,10 +69,10 @@ import com.google.common.util.concurrent
*
*/
public class ThriftHRegionServer implements ThriftHRegionInterface {
- public static Log LOG = LogFactory.getLog(ThriftHRegionServer.class);
+ private static Log LOG = LogFactory.getLog(ThriftHRegionServer.class);
private HRegionServer server;
- private EndpointServer endpointServer;
+ private IEndpointServer endpointServer;
public ThriftHRegionServer(HRegionServer server) {
this.server = server;
@@ -639,9 +638,9 @@ public class ThriftHRegionServer impleme
@Override
public byte[] callEndpoint(String epName, String methodName,
- final byte[] regionName, final byte[] startRow, final byte[] stopRow)
- throws ThriftHBaseException {
- return endpointServer.callEndpoint(epName, methodName, regionName,
+ ArrayList<byte[]> params, final byte[] regionName, final byte[] startRow,
+ final byte[] stopRow) throws ThriftHBaseException {
+ return endpointServer.callEndpoint(epName, methodName, params, regionName,
startRow, stopRow);
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java?rev=1588118&r1=1588117&r2=1588118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java Thu Apr 17 00:49:17 2014
@@ -23,20 +23,16 @@ import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointLib;
import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointLib.IAggregator;
-import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointManager;
-import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpoint;
-import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointClient;
import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointClient.Caller;
-import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointContext;
-import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointFactory;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.StringBytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -47,12 +43,15 @@ import org.junit.Test;
*/
public class TestEndpoint {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static final byte[] TABLE_NAME = Bytes.toBytes("cp");
+ private static final StringBytes TABLE_NAME = new StringBytes("cp");
private static final byte[] FAMILY_NAME = Bytes.toBytes("f");
private static final byte[] QUALITY_NAME = Bytes.toBytes("q");
@Before
public void setUp() throws Exception {
+ TEST_UTIL.getConfiguration().set(HBaseTestingUtility.FS_TYPE_KEY,
+ HBaseTestingUtility.FS_TYPE_LFS);
+
TEST_UTIL.startMiniCluster();
// Register an endpoint in the server side.
EndpointManager.get().register(ISummer.class,
@@ -75,7 +74,7 @@ public class TestEndpoint {
*
*/
public static interface ISummer extends IEndpoint {
- byte[] sum() throws IOException;
+ long sum(int offset) throws IOException;
}
/**
@@ -83,6 +82,7 @@ public class TestEndpoint {
*/
public static class Summer implements ISummer, IAggregator {
IEndpointContext context;
+ int offset;
long result;
@Override
@@ -91,21 +91,23 @@ public class TestEndpoint {
}
@Override
- public byte[] sum() throws IOException {
+ public long sum(int offset) throws IOException {
HRegion region = context.getRegion();
Scan scan = new Scan();
scan.addFamily(FAMILY_NAME);
scan.addColumn(FAMILY_NAME, QUALITY_NAME);
+ this.offset = offset;
this.result = 0L;
EndpointLib.aggregateScan(region, scan, this);
- return Bytes.toBytes(this.result);
+ return this.result;
}
@Override
public void aggregate(KeyValue kv) {
- this.result += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(),
+ long vl = Bytes.toLong(kv.getBuffer(), kv.getValueOffset(),
kv.getValueLength());
+ this.result += vl + offset;
}
}
@@ -120,23 +122,42 @@ public class TestEndpoint {
QUALITY_NAME, Bytes.toBytes((long) i)));
}
- // Calling endpoints.
IEndpointClient cp = (IEndpointClient) table;
- Map<byte[], byte[]> results = cp.coprocessorEndpoint(ISummer.class, null,
- null, new Caller<ISummer>() {
+
+ // Calling endpoints with zero offsets.
+ Map<HRegionInfo, Long> results = cp.coprocessorEndpoint(ISummer.class, null,
+ null, new Caller<ISummer, Long>() {
@Override
- public byte[] call(ISummer client) throws IOException {
- return client.sum();
+ public Long call(ISummer client) throws IOException {
+ return client.sum(0);
}
});
// Aggregates results from all regions
long sum = 0;
- for (byte[] res : results.values()) {
- sum += Bytes.toLong(res);
+ for (Long res : results.values()) {
+ sum += res;
}
// Check the final results
Assert.assertEquals("sum", 55, sum);
+
+ // Calling endpoints with -1 offsets.
+ results = cp.coprocessorEndpoint(ISummer.class, null,
+ null, new Caller<ISummer, Long>() {
+ @Override
+ public Long call(ISummer client) throws IOException {
+ return client.sum(-1);
+ }
+ });
+
+ // Aggregates results from all regions
+ sum = 0;
+ for (Long res : results.values()) {
+ sum += res;
+ }
+
+ // Check the final results
+ Assert.assertEquals("sum", 45, sum);
}
}
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointBytesCodec.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointBytesCodec.java?rev=1588118&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointBytesCodec.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointBytesCodec.java Thu Apr 17 00:49:17 2014
@@ -0,0 +1,65 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.endpoints;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Testcases for EndpointBytesCodec.
+ */
+public class TestEndpointBytesCodec {
+ @Test
+ public void testBasic() throws Exception {
+ final Object[] VALUES = {
+ false, true,
+ (byte) 2,
+ (char) 3,
+ (short) 4,
+ (int) 5,
+ (long) 6,
+ (float) 7.,
+ (double) 8.,
+ Bytes.toBytes("9"),
+ };
+
+ ArrayList<byte[]> arrayBytes = EndpointBytesCodec.encodeArray(VALUES);
+ for (int i = 0; i < VALUES.length; i++) {
+ Object vl = VALUES[i];
+ byte[] bytes = EndpointBytesCodec.encodeObject(vl);
+
+ Assert.assertArrayEquals("element in encodedArray[" + i + "]", bytes,
+ arrayBytes.get(i));
+
+ Object decoded = EndpointBytesCodec.decode(vl.getClass(), bytes);
+ if (vl instanceof byte[]) {
+ Assert.assertArrayEquals(
+ "recoved value of " + Bytes.toString((byte[]) vl),
+ (byte[]) vl, (byte[]) decoded);
+ } else {
+ Assert.assertEquals("recoved value of " + vl, vl, decoded);
+ }
+ }
+
+ }
+}