You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/02 23:03:32 UTC

svn commit: r1584179 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/coprocessor/ main/java/org/apache/hadoop/hbase/ipc/ main/java/org/apache/hadoop/hbase/ipc/thrift/ main/java/org/apache/ha...

Author: liyin
Date: Wed Apr  2 21:03:31 2014
New Revision: 1584179

URL: http://svn.apache.org/r1584179
Log:
[master] First commit of Endpoint implementation.

Author: daviddeng

Summary:
An endpoint is a method running on the server. Some aggregation work can be done on the server and only final results are returned back to the client.

Current endpoints have the following restrictions, may be removed later:

An example of aggregation is shown in `TestEndpoint`

Test Plan: `TestEndpoint`

Reviewers: adela, manukranthk, aaiyer, fan, liyintang, gauravm

Reviewed By: adela

CC: hbase-eng@, andrewcox

Differential Revision: https://phabricator.fb.com/D1223428

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointLib.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/HTableEndpointClient.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpoint.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpointClient.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpointContext.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpointFactory.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/TestEndpoint.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
    hbase/branches/0.89-fb/src/main/resources/org/apache/hadoop/hbase/thrift/HBase.thrift
    hbase/branches/0.89-fb/src/main/resources/org/apache/hadoop/hbase/thrift/gen_thrift_from_swift.sh

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1584179&r1=1584178&r2=1584179&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed Apr  2 21:03:31 2014
@@ -46,12 +46,14 @@ import org.apache.hadoop.hbase.HServerAd
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
+import org.apache.hadoop.hbase.coprocessor.HTableEndpointClient;
+import org.apache.hadoop.hbase.coprocessor.IEndpoint;
+import org.apache.hadoop.hbase.coprocessor.IEndpointClient;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.PreloadThreadPool;
 import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket;
 import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
 import org.apache.hadoop.hbase.ipc.ProfilingData;
-import org.apache.hadoop.hbase.ipc.thrift.HBaseThriftRPC;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.DaemonThreadFactory;
 import org.apache.hadoop.hbase.util.Pair;
@@ -69,7 +71,7 @@ import com.google.common.base.Preconditi
  *
  * See {@link HBaseAdmin} to create, drop, list, enable and disable tables.
  */
-public class HTable implements HTableInterface {
+public class HTable implements HTableInterface, IEndpointClient {
   private final HConnection connection;
   private final byte [] tableName;
   protected final int scannerTimeout;
@@ -88,6 +90,8 @@ public class HTable implements HTableInt
   private HTableAsync hta;
   private boolean doAsync;
 
+  private IEndpointClient endpointClient;
+
   // Share this multiaction thread pool across all the HTable instance;
   // The total number of threads will be bounded #HTable * #RegionServer.
   static ExecutorService multiActionThreadPool =
@@ -192,6 +196,8 @@ public class HTable implements HTableInt
         HConstants.HTABLE_ASYNC_CALLS_DEFAULT)
         && configuration.getBoolean(HConstants.CLIENT_TO_RS_USE_THRIFT,
             HConstants.CLIENT_TO_RS_USE_THRIFT_DEFAULT);
+
+    this.endpointClient = new HTableEndpointClient(this);
   }
 
   /**
@@ -1445,4 +1451,13 @@ public class HTable implements HTableInt
   public HBaseRPCOptions getOptions() {
     return options;
   }
+
+  @Override
+  public <T extends IEndpoint> Map<byte[], byte[]> coprocessorEndpoint(
+      Class<T> clazz, byte[] startRow, byte[] stopRow, Caller<T> caller)
+      throws IOException {
+    return this.endpointClient.coprocessorEndpoint(clazz, startRow, stopRow,
+        caller);
+  }
+
 }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointLib.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointLib.java?rev=1584179&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointLib.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointLib.java Wed Apr  2 21:03:31 2014
@@ -0,0 +1,62 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+
+/**
+ * Some algorithm library for implementing endpoints.
+ *
+ * Current implemented:
+ * aggregateScan aggregate key-values defined by a Scan
+ */
+public class EndpointLib {
+  /**
+   * An interface for aggregateScan
+   */
+  interface IAggregator {
+    void aggregate(KeyValue kv);
+  }
+
+  /**
+   * Aggregates all KeyValue's in a region defined by a Scan.
+   */
+  public static void aggregateScan(HRegion region, Scan scan, IAggregator aggr)
+      throws IOException {
+    try (InternalScanner scanner = region.getScanner(scan)) {
+      ArrayList<KeyValue> kvs = new ArrayList<>();
+      boolean hasMore = true;
+      while (hasMore) {
+        kvs.clear();
+        hasMore = scanner.next(kvs);
+        for (KeyValue kv : kvs) {
+          aggr.aggregate(kv);
+        }
+      }
+    }
+  }
+
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointManager.java?rev=1584179&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointManager.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointManager.java Wed Apr  2 21:03:31 2014
@@ -0,0 +1,55 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The manager holding all endpoint factories in the server.
+ *
+ */
+public class EndpointManager {
+  private static EndpointManager instance = new EndpointManager();
+
+  /**
+   * Returns the singleton endpoint-manager
+   */
+  public static EndpointManager get() {
+    return instance;
+  }
+
+  private ConcurrentHashMap<String, IEndpointFactory<?>> nameFacts = new ConcurrentHashMap<>();
+
+  /**
+   * Returns the factory of an endpoint.
+   */
+  public IEndpointFactory<?> getFactory(String name) {
+    return nameFacts.get(name);
+
+  }
+
+  /**
+   * Register an endpoint with its factory
+   */
+  public <T extends IEndpoint> void register(Class<T> iEndpoint,
+      IEndpointFactory<T> factory) {
+    nameFacts.put(iEndpoint.getName(), factory);
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointServer.java?rev=1584179&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointServer.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/EndpointServer.java Wed Apr  2 21:03:31 2014
@@ -0,0 +1,96 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+
+/**
+ * A endpoint server.
+ */
+public class EndpointServer {
+
+  private HRegionServer server;
+
+  public EndpointServer(HRegionServer server) {
+    this.server = server;
+  }
+
+  /**
+   * Calls an endpoint on an region server.
+   *
+   * TODO make regionName a list.
+   *
+   * @param epName
+   *          the endpoint name.
+   * @param methodName
+   *          the method name.
+   * @param regionName
+   *          the name of the region
+   * @param startRow
+   *          the start row, inclusive
+   * @param stopRow
+   *          the stop row, exclusive
+   * @return the computed value.
+   */
+  public byte[] callEndpoint(String epName, String methodName,
+      final byte[] regionName, final byte[] startRow, final byte[] stopRow)
+      throws ThriftHBaseException {
+    try {
+      IEndpointFactory<?> fact = EndpointManager.get().getFactory(epName);
+      if (fact == null) {
+        // TODO daviddeng make a special exception for this
+        throw new DoNotRetryIOException("Endpoint " + epName
+            + " does not exists");
+      }
+      IEndpoint ep = fact.create();
+
+      ep.setContext(new IEndpointContext() {
+        @Override
+        public HRegion getRegion() throws NotServingRegionException {
+          return EndpointServer.this.server.getRegion(regionName);
+        }
+
+        @Override
+        public byte[] getStartRow() {
+          return startRow;
+        }
+
+        @Override
+        public byte[] getStopRow() {
+          return stopRow;
+        }
+      });
+
+      // TODO daviddeng: now we only support methods without any parameters.
+      Method mth = ep.getClass().getMethod(methodName);
+      return (byte[]) mth.invoke(ep);
+    } catch (Exception e) {
+      // TODO daviddeng if the method is not found, should throw
+      // DoNotRetryIOException
+      throw new ThriftHBaseException(e);
+    }
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/HTableEndpointClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/HTableEndpointClient.java?rev=1584179&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/HTableEndpointClient.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/HTableEndpointClient.java Wed Apr  2 21:03:31 2014
@@ -0,0 +1,103 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A IEndpointClient served as part of an HTable.
+ */
+public class HTableEndpointClient implements IEndpointClient {
+  private HTable table;
+
+  public HTableEndpointClient(HTable table) {
+    this.table = table;
+  }
+
+  /**
+   * Returns an proxy instance for an IEndpont.
+   *
+   * @param clazz
+   *          the class of the endpoint interface to call.
+   * @param region
+   *          the region info
+   * @param startRow
+   *          the start row
+   * @param stopRow
+   *          the end row
+   */
+  @SuppressWarnings("unchecked")
+  protected <T extends IEndpoint> T getEndpointProxy(final Class<T> clazz,
+      final HRegionInfo region, final byte[] startRow, final byte[] stopRow) {
+
+    InvocationHandler handler = new InvocationHandler() {
+      @Override
+      public Object invoke(Object proxy, final Method method, Object[] args)
+          throws Throwable {
+        HConnection conn = table.getConnectionAndResetOperationContext();
+        return conn.getRegionServerWithRetries(new ServerCallable<byte[]>(
+            table.getConnection(), table.getTableName(), region.getStartKey(),
+            table.getOptions()) {
+          @Override
+          public byte[] call() throws IOException {
+            // TODO support arguments
+            return server.callEndpoint(clazz.getName(), method.getName(),
+                region.getRegionName(), startRow, stopRow);
+          }
+        });
+      }
+    };
+
+    return (T) Proxy.newProxyInstance(clazz.getClassLoader(),
+        new Class<?>[] { clazz }, handler);
+  }
+
+  @Override
+  public <T extends IEndpoint> Map<byte[], byte[]> coprocessorEndpoint(
+      Class<T> clazz, byte[] startRow, byte[] stopRow, Caller<T> caller)
+      throws IOException {
+    Map<byte[], byte[]> results = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+    NavigableMap<HRegionInfo, HServerAddress> regions = table.getRegionsInfo();
+
+    for (final HRegionInfo region : regions.keySet()) {
+      // TODO compute startRow and stopRow
+      T ep = getEndpointProxy(clazz, region, HConstants.EMPTY_BYTE_ARRAY,
+          HConstants.EMPTY_BYTE_ARRAY);
+      results.put(region.getRegionName(), caller.call(ep));
+    }
+
+    return results;
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpoint.java?rev=1584179&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpoint.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpoint.java Wed Apr  2 21:03:31 2014
@@ -0,0 +1,35 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+/**
+ * The common parent of all endpoint interfaces.
+ */
+public interface IEndpoint {
+
+  /**
+   * Called to set the endpoint context. The implementation can save the
+   * instance.
+   *
+   * @param a
+   *          non-null IEndpointContext
+   */
+  void setContext(IEndpointContext context);
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpointClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpointClient.java?rev=1584179&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpointClient.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpointClient.java Wed Apr  2 21:03:31 2014
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * The interface of a client for calling a endpoint.
+ */
+public interface IEndpointClient {
+
+  /**
+   * The interface of a caller for <code>coprocessorEndpoint</code>
+   *
+   * @param <T>
+   *          The type of the endpoint interface. (NOT the implementation)
+   */
+  public interface Caller<T extends IEndpoint> {
+
+    /**
+     * Calls an endpoint.
+     *
+     * @param client
+     *          an RPC client.
+     * @return the result to be put as a value in coprocessorEndpoint's results
+     */
+    byte[] call(T client) throws IOException;
+  }
+
+  /**
+   * Calls an endpoint in the server side and returns results.
+   *
+   * The <tt>caller</tt> is called for every region provied with an RPC client
+   * with the same time as the endpoint interface.
+   *
+   * @param clazz
+   *          the class of the endpoint interface.
+   * @param startRow
+   *          the start row. null or empty array means no limit on start.
+   * @param stopRow
+   *          the stop row. null or empty array means no limit on stop.
+   * @param caller
+   *          the caller for each region
+   * @return a map from region name to results.
+   */
+  <T extends IEndpoint> Map<byte[], byte[]> coprocessorEndpoint(Class<T> clazz,
+      byte[] startRow, byte[] stopRow, Caller<T> caller) throws IOException;
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpointContext.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpointContext.java?rev=1584179&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpointContext.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpointContext.java Wed Apr  2 21:03:31 2014
@@ -0,0 +1,48 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+
+/**
+ * The context of an endpoint calling.
+ *
+ * TODO add more functions if necessary for access more resource on the server.
+ */
+public interface IEndpointContext {
+  /**
+   * Returns an HRegion instance.
+   *
+   * @throws NotServingRegionException
+   *           if the region is not served on this server.
+   */
+  HRegion getRegion() throws NotServingRegionException;
+
+  /**
+   * The start row, inclusive, within this region of this call.
+   */
+  byte[] getStartRow();
+
+  /**
+   * The stop row, exclusive, within this region of this call.
+   */
+  byte[] getStopRow();
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpointFactory.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpointFactory.java?rev=1584179&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpointFactory.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/IEndpointFactory.java Wed Apr  2 21:03:31 2014
@@ -0,0 +1,33 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+/**
+ * The factory for generating IEndpoint instances.
+ *
+ * @param <T>
+ *          The type of the endpoint interface
+ */
+public interface IEndpointFactory<T extends IEndpoint> {
+  /**
+   * Creates a new instance of the endpoint instance.
+   */
+  T create();
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1584179&r1=1584178&r2=1584179&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Wed Apr  2 21:03:31 2014
@@ -50,6 +50,22 @@ import org.apache.hadoop.io.MapWritable;
  */
 public interface HRegionInterface extends HBaseRPCProtocolVersion, Restartable,
     Stoppable, ThriftClientInterface {
+
+  /**
+   * Calls an endpoint on an region server.
+   *
+   * TODO make regionName a list.
+   *
+   * @param epName      the endpoint name.
+   * @param methodName  the method name.
+   * @param regionName  the name of the region
+   * @param startRow    the start row, inclusive
+   * @param stopRow     the stop row, exclusive
+   * @return  the computed value.
+   */
+  public byte[] callEndpoint(String epName, String methodName,
+      byte[] regionName, byte[] startRow, byte[] stopRow) throws IOException;
+
   /**
    * Get metainfo about an HRegion
    *
@@ -415,9 +431,11 @@ public interface HRegionInterface extend
    * Stop this service.
    * @param why Why we're stopping.
    */
+  @Override
   public void stop(String why);
 
   /** @return why we are stopping */
+  @Override
   public String getStopReason();
 
   /**

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java?rev=1584179&r1=1584178&r2=1584179&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java Wed Apr  2 21:03:31 2014
@@ -56,6 +56,28 @@ import com.google.common.util.concurrent
  */
 @ThriftService
 public interface ThriftHRegionInterface extends ThriftClientInterface {
+
+  /**
+   * Calls an endpoint on an region server.
+   *
+   * TODO make regionName/startRow/stopRow a list.
+   *
+   * @param epName  the endpoint name.
+   * @param methodName  the method name.
+   * @param regionName  the name of the region
+   * @param startRow  the start row, inclusive
+   * @param stopRow  the stop row, exclusive
+   * @return  the computed value.
+   */
+  @ThriftMethod(value = "callEndpoint", exception = {
+      @ThriftException(type = ThriftHBaseException.class, id = 1) })
+  public byte[] callEndpoint(@ThriftField(name = "epName") String epName,
+      @ThriftField(name = "methodName") String methodName,
+      @ThriftField(name = "regionName") byte[] regionName,
+      @ThriftField(name = "startRow") byte[] startRow,
+      @ThriftField(name = "stopRow") byte[] stopRow)
+      throws ThriftHBaseException;
+
   /**
    * Get metainfo about an HRegion
    *
@@ -71,7 +93,7 @@ public interface ThriftHRegionInterface 
 
   /**
    * Return all the data for the row that matches <i>row</i> exactly,
-   * or the one that immediately preceeds it.
+   * or the one that immediately proceeds it.
    *
    * @param regionName region name
    * @param row row key

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java?rev=1584179&r1=1584178&r2=1584179&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java Wed Apr  2 21:03:31 2014
@@ -1172,4 +1172,25 @@ public class HBaseToThriftAdapter implem
       postProcess();
     }
   }
+
+  @Override
+  public byte[] callEndpoint(String epName, String methodName,
+      byte[] regionName, byte[] startRow, byte[] stopRow) throws IOException {
+    preProcess();
+    try {
+      return connection.callEndpoint(epName, methodName, regionName, startRow,
+          stopRow);
+    } catch (ThriftHBaseException te) {
+      Exception e = te.getServerJavaException();
+      handleIOException(e);
+      LOG.warn("Unexpected Exception: " + e);
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      refreshConnectionAndThrowIOException(e);
+      return null;
+    } finally {
+      postProcess();
+    }
+  }
+
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1584179&r1=1584178&r2=1584179&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Apr  2 21:03:31 2014
@@ -66,6 +66,7 @@ import org.apache.commons.cli.CommandLin
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.lang.mutable.MutableDouble;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -531,6 +532,7 @@ public class HRegionServer implements HR
    * regionserver object would be unpublished at that point.
    * @throws IOException
    */
+  @SuppressWarnings("unchecked")
   public void initialize() throws IOException {
     this.restartRequested = false;
     this.abortRequested = false;
@@ -3500,7 +3502,7 @@ public class HRegionServer implements HR
    * @return {@link HRegion} for <code>regionName</code>
    * @throws NotServingRegionException
    */
-  protected HRegion getRegion(final byte [] regionName)
+  public HRegion getRegion(final byte[] regionName)
   throws NotServingRegionException {
     HRegion region = null;
     this.lock.readLock().lock();
@@ -4076,6 +4078,12 @@ public class HRegionServer implements HR
 
   @Override
   public void close() throws Exception {
-    // TODO Auto-generated method stub
+  }
+
+  @Override
+  public byte[] callEndpoint(String epName, String methodName,
+      final byte[] regionName, final byte[] startRow, final byte[] stopRow)
+      throws IOException {
+    throw new NotImplementedException("HRegionserver.callEndpoint");
   }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java?rev=1584179&r1=1584178&r2=1584179&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java Wed Apr  2 21:03:31 2014
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.client.Ro
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TMultiResponse;
 import org.apache.hadoop.hbase.client.TRowMutations;
+import org.apache.hadoop.hbase.coprocessor.EndpointServer;
 import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket;
 import org.apache.hadoop.hbase.ipc.ThriftHRegionInterface;
 import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
@@ -65,13 +66,15 @@ import com.google.common.util.concurrent
  * This is just a wrapper around {@link HRegionServer}
  *
  */
-public class ThriftHRegionServer implements ThriftHRegionInterface{
+public class ThriftHRegionServer implements ThriftHRegionInterface {
   public static Log LOG = LogFactory.getLog(ThriftHRegionServer.class);
 
   private HRegionServer server;
+  private EndpointServer endpointServer;
 
   public ThriftHRegionServer(HRegionServer server) {
     this.server = server;
+    this.endpointServer = new EndpointServer(this.server);
   }
 
   @Override
@@ -630,4 +633,12 @@ public class ThriftHRegionServer impleme
       throw new ThriftHBaseException(e);
     }
   }
+
+  @Override
+  public byte[] callEndpoint(String epName, String methodName,
+      final byte[] regionName, final byte[] startRow, final byte[] stopRow)
+      throws ThriftHBaseException {
+    return endpointServer.callEndpoint(epName, methodName, regionName,
+        startRow, stopRow);
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java?rev=1584179&r1=1584178&r2=1584179&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java Wed Apr  2 21:03:31 2014
@@ -566,7 +566,7 @@ public class Bytes {
    * @return the long value
    */
   public static long toLong(byte[] bytes) {
-    return toLong(bytes, 0, SIZEOF_LONG);
+    return toLong(bytes, 0);
   }
 
   /**
@@ -578,7 +578,15 @@ public class Bytes {
    * @return the long value
    */
   public static long toLong(byte[] bytes, int offset) {
-    return toLong(bytes, offset, SIZEOF_LONG);
+    if (offset + SIZEOF_LONG > bytes.length) {
+      throw explainWrongLengthOrOffset(bytes, offset, SIZEOF_LONG, SIZEOF_LONG);
+    }
+    long l = 0;
+    for (int i = offset; i < offset + SIZEOF_LONG; i++) {
+      l <<= 8;
+      l ^= bytes[i] & 0xFF;
+    }
+    return l;
   }
 
   /**
@@ -592,15 +600,10 @@ public class Bytes {
    * if there's not enough room in the array at the offset indicated.
    */
   public static long toLong(byte[] bytes, int offset, final int length) {
-    if (length != SIZEOF_LONG || offset + length > bytes.length) {
+    if (length != SIZEOF_LONG) {
       throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_LONG);
     }
-    long l = 0;
-    for(int i = offset; i < offset + length; i++) {
-      l <<= 8;
-      l ^= bytes[i] & 0xFF;
-    }
-    return l;
+    return toLong(bytes, offset);
   }
 
   private static IllegalArgumentException
@@ -656,7 +659,7 @@ public class Bytes {
    * @return Float made from passed byte array.
    */
   public static float toFloat(byte [] bytes, int offset) {
-    return Float.intBitsToFloat(toInt(bytes, offset, SIZEOF_INT));
+    return Float.intBitsToFloat(toInt(bytes, offset, SIZEOF_FLOAT));
   }
 
   /**
@@ -692,7 +695,7 @@ public class Bytes {
    * @return Return double made from passed bytes.
    */
   public static double toDouble(final byte [] bytes, final int offset) {
-    return Double.longBitsToDouble(toLong(bytes, offset, SIZEOF_LONG));
+    return Double.longBitsToDouble(toLong(bytes, offset, SIZEOF_DOUBLE));
   }
 
   /**

Modified: hbase/branches/0.89-fb/src/main/resources/org/apache/hadoop/hbase/thrift/HBase.thrift
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/resources/org/apache/hadoop/hbase/thrift/HBase.thrift?rev=1584179&r1=1584178&r2=1584179&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/resources/org/apache/hadoop/hbase/thrift/HBase.thrift (original)
+++ hbase/branches/0.89-fb/src/main/resources/org/apache/hadoop/hbase/thrift/HBase.thrift Wed Apr  2 21:03:31 2014
@@ -192,6 +192,7 @@ struct Bucket {
 service ThriftHRegionInterface {
   void bulkLoadHFile(1: string hfilePath, 2: binary regionName, 3: binary familyName) throws (1: ThriftHBaseException ex1);
   void bulkLoadHFileSeqNum(1: string hfilePath, 2: binary regionName, 3: binary familyName, 4: bool assignSeqNum) throws (1: ThriftHBaseException ex1);
+  binary callEndpoint(1: string epName, 2: string methodName, 3: binary regionName, 4: binary startRow, 5: binary stopRow) throws (1: ThriftHBaseException ex1);
   bool checkAndDelete(1: binary regionName, 2: binary row, 3: binary family, 4: binary qualifier, 5: binary value, 6: Delete deleteArg) throws (1: ThriftHBaseException ex1);
   bool checkAndPut(1: binary regionName, 2: binary row, 3: binary family, 4: binary qualifier, 5: binary value, 6: Put put) throws (1: ThriftHBaseException ex1);
   void close(1: i64 scannerId) throws (1: ThriftHBaseException ex1);

Modified: hbase/branches/0.89-fb/src/main/resources/org/apache/hadoop/hbase/thrift/gen_thrift_from_swift.sh
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/resources/org/apache/hadoop/hbase/thrift/gen_thrift_from_swift.sh?rev=1584179&r1=1584178&r2=1584179&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/resources/org/apache/hadoop/hbase/thrift/gen_thrift_from_swift.sh (original)
+++ hbase/branches/0.89-fb/src/main/resources/org/apache/hadoop/hbase/thrift/gen_thrift_from_swift.sh Wed Apr  2 21:03:31 2014
@@ -8,4 +8,7 @@ then
 fi
 export CLASSPATH=$CLASSPATH:$1:$2:$3
 
-java com.facebook.swift.generator.swift2thrift.Main -allow_multiple_packages org.apache.hadoop.hbase org.apache.hadoop.hbase.KeyValue org.apache.hadoop.hbase.client.Put org.apache.hadoop.hbase.io.TimeRange org.apache.hadoop.hbase.filter.TFilter org.apache.hadoop.hbase.client.Get org.apache.hadoop.hbase.client.MultiPut org.apache.hadoop.hbase.client.Delete org.apache.hadoop.hbase.client.Scan org.apache.hadoop.hbase.HColumnDescriptor org.apache.hadoop.hbase.HTableDescriptor org.apache.hadoop.hbase.HRegionInfo org.apache.hadoop.hbase.client.MultiPutResponse org.apache.hadoop.hbase.client.Result org.apache.hadoop.hbase.HServerAddress 'org.apache.hadoop.hbase.HServerLoad$RegionLoad' org.apache.hadoop.hbase.HServerLoad org.apache.hadoop.hbase.HServerInfo org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException org.apache.hadoop.hbase.client.MultiAction 'org.apache.hadoop.hbase.client.IntegerOrResultOrException$Type' org.apache.hadoop.hbase.client.IntegerOrResultOrException org.a
 pache.hadoop.hbase.client.TMultiResponse org.apache.hadoop.hbase.client.TRowMutations org.apache.hadoop.hbase.master.AssignmentPlan org.apache.hadoop.hbase.client.RowLock 'org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram$HFileStat' 'org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram$Bucket' org.apache.hadoop.hbase.ipc.ThriftHRegionInterface -out HBase.thrift
+
+dirpath=`dirname $0`
+
+$JAVA_HOME/bin/java com.facebook.swift.generator.swift2thrift.Main -allow_multiple_packages org.apache.hadoop.hbase org.apache.hadoop.hbase.KeyValue org.apache.hadoop.hbase.client.Put org.apache.hadoop.hbase.io.TimeRange org.apache.hadoop.hbase.filter.TFilter org.apache.hadoop.hbase.client.Get org.apache.hadoop.hbase.client.MultiPut org.apache.hadoop.hbase.client.Delete org.apache.hadoop.hbase.client.Scan org.apache.hadoop.hbase.HColumnDescriptor org.apache.hadoop.hbase.HTableDescriptor org.apache.hadoop.hbase.HRegionInfo org.apache.hadoop.hbase.client.MultiPutResponse org.apache.hadoop.hbase.client.Result org.apache.hadoop.hbase.HServerAddress 'org.apache.hadoop.hbase.HServerLoad$RegionLoad' org.apache.hadoop.hbase.HServerLoad org.apache.hadoop.hbase.HServerInfo org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException org.apache.hadoop.hbase.client.MultiAction 'org.apache.hadoop.hbase.client.IntegerOrResultOrException$Type' org.apache.hadoop.hbase.client.IntegerOrResultOr
 Exception org.apache.hadoop.hbase.client.TMultiResponse org.apache.hadoop.hbase.client.TRowMutations org.apache.hadoop.hbase.master.AssignmentPlan org.apache.hadoop.hbase.client.RowLock 'org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram$HFileStat' 'org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram$Bucket' org.apache.hadoop.hbase.ipc.ThriftHRegionInterface -out "$dirpath/HBase.thrift"

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/TestEndpoint.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/TestEndpoint.java?rev=1584179&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/TestEndpoint.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/TestEndpoint.java Wed Apr  2 21:03:31 2014
@@ -0,0 +1,136 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.EndpointLib.IAggregator;
+import org.apache.hadoop.hbase.coprocessor.IEndpointClient.Caller;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Testcase for endpoint
+ */
+public class TestEndpoint {
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final byte[] TABLE_NAME = Bytes.toBytes("cp");
+  private static final byte[] FAMILY_NAME = Bytes.toBytes("f");
+  private static final byte[] QUALITY_NAME = Bytes.toBytes("q");
+
+  @Before
+  public void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster();
+    // Register an endpoint in the server side.
+    EndpointManager.get().register(ISummer.class,
+        new IEndpointFactory<ISummer>() {
+          @Override
+          public ISummer create() {
+            return new Summer();
+          }
+        });
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * This is an example of an endpoint interface. It computes the total sum of
+   * all the values of family FAMILY_NAME at quality QUALITY_NAME as longs.
+   *
+   */
+  public static interface ISummer extends IEndpoint {
+    byte[] sum() throws IOException;
+  }
+
+  /**
+   * The implementation of ISummer.
+   */
+  public static class Summer implements ISummer, IAggregator {
+    IEndpointContext context;
+    long result;
+
+    @Override
+    public void setContext(IEndpointContext context) {
+      this.context = context;
+    }
+
+    @Override
+    public byte[] sum() throws IOException {
+      HRegion region = context.getRegion();
+      Scan scan = new Scan();
+      scan.addFamily(FAMILY_NAME);
+      scan.addColumn(FAMILY_NAME, QUALITY_NAME);
+
+      this.result = 0L;
+      EndpointLib.aggregateScan(region, scan, this);
+      return Bytes.toBytes(this.result);
+    }
+
+    @Override
+    public void aggregate(KeyValue kv) {
+      this.result += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(),
+          kv.getValueLength());
+    }
+  }
+
+  @Test
+  public void testSummer() throws Exception {
+    // Create the table
+    HTableInterface table = TEST_UTIL.createTable(TABLE_NAME, FAMILY_NAME);
+
+    // Put some values
+    for (int i = 1; i <= 10; i++) {
+      table.put(new Put(Bytes.toBytes("row" + i)).add(FAMILY_NAME,
+          QUALITY_NAME, Bytes.toBytes((long) i)));
+    }
+
+    // Calling endpoints.
+    IEndpointClient cp = (IEndpointClient) table;
+    Map<byte[], byte[]> results = cp.coprocessorEndpoint(ISummer.class, null,
+        null, new Caller<ISummer>() {
+          @Override
+          public byte[] call(ISummer client) throws IOException {
+            return client.sum();
+          }
+        });
+
+    // Aggregates results from all regions
+    long sum = 0;
+    for (byte[] res : results.values()) {
+      sum += Bytes.toLong(res);
+    }
+
+    // Check the final results
+    Assert.assertEquals("sum", 55, sum);
+  }
+}