You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/10/04 05:16:43 UTC

[30/51] [partial] hbase git commit: HBASE-15638 Shade protobuf Which includes

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
new file mode 100644
index 0000000..f5d2a20
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
@@ -0,0 +1,563 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.TestServerCustomProtocol;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.ClassLoaderTestHelper;
+import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.RegionLoad;
+
+import java.io.*;
+import java.util.*;
+
+import org.junit.*;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Test coprocessors class loading.
+ */
+@Category({CoprocessorTests.class, MediumTests.class})
+public class TestClassLoading {
+  private static final Log LOG = LogFactory.getLog(TestClassLoading.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static MiniDFSCluster cluster;
+
+  static final TableName tableName = TableName.valueOf("TestClassLoading");
+  static final String cpName1 = "TestCP1";
+  static final String cpName2 = "TestCP2";
+  static final String cpName3 = "TestCP3";
+  static final String cpName4 = "TestCP4";
+  static final String cpName5 = "TestCP5";
+  static final String cpName6 = "TestCP6";
+
+  private static Class<?> regionCoprocessor1 = ColumnAggregationEndpoint.class;
+  // TOOD: Fix the import of this handler.  It is coming in from a package that is far away.
+  private static Class<?> regionCoprocessor2 = TestServerCustomProtocol.PingHandler.class;
+  private static Class<?> regionServerCoprocessor = SampleRegionWALObserver.class;
+  private static Class<?> masterCoprocessor = BaseMasterObserver.class;
+
+  private static final String[] regionServerSystemCoprocessors =
+      new String[]{
+      regionServerCoprocessor.getSimpleName()
+  };
+
+  private static final String[] masterRegionServerSystemCoprocessors = new String[] {
+      regionCoprocessor1.getSimpleName(), MultiRowMutationEndpoint.class.getSimpleName(),
+      regionServerCoprocessor.getSimpleName() };
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+
+    // regionCoprocessor1 will be loaded on all regionservers, since it is
+    // loaded for any tables (user or meta).
+    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+        regionCoprocessor1.getName());
+
+    // regionCoprocessor2 will be loaded only on regionservers that serve a
+    // user table region. Therefore, if there are no user tables loaded,
+    // this coprocessor will not be loaded on any regionserver.
+    conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+        regionCoprocessor2.getName());
+
+    conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+        regionServerCoprocessor.getName());
+    conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+        masterCoprocessor.getName());
+    TEST_UTIL.startMiniCluster(1);
+    cluster = TEST_UTIL.getDFSCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  static File buildCoprocessorJar(String className) throws Exception {
+    String code = "import org.apache.hadoop.hbase.coprocessor.*;" +
+      "public class " + className + " extends BaseRegionObserver {}";
+    return ClassLoaderTestHelper.buildJar(
+      TEST_UTIL.getDataTestDir().toString(), className, code);
+  }
+
+  @Test
+  // HBASE-3516: Test CP Class loading from HDFS
+  public void testClassLoadingFromHDFS() throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+
+    File jarFile1 = buildCoprocessorJar(cpName1);
+    File jarFile2 = buildCoprocessorJar(cpName2);
+
+    // copy the jars into dfs
+    fs.copyFromLocalFile(new Path(jarFile1.getPath()),
+      new Path(fs.getUri().toString() + Path.SEPARATOR));
+    String jarFileOnHDFS1 = fs.getUri().toString() + Path.SEPARATOR +
+      jarFile1.getName();
+    Path pathOnHDFS1 = new Path(jarFileOnHDFS1);
+    assertTrue("Copy jar file to HDFS failed.",
+      fs.exists(pathOnHDFS1));
+    LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS1);
+
+    fs.copyFromLocalFile(new Path(jarFile2.getPath()),
+        new Path(fs.getUri().toString() + Path.SEPARATOR));
+    String jarFileOnHDFS2 = fs.getUri().toString() + Path.SEPARATOR +
+      jarFile2.getName();
+    Path pathOnHDFS2 = new Path(jarFileOnHDFS2);
+    assertTrue("Copy jar file to HDFS failed.",
+      fs.exists(pathOnHDFS2));
+    LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS2);
+
+    // create a table that references the coprocessors
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor("test"));
+      // without configuration values
+    htd.setValue("COPROCESSOR$1", jarFileOnHDFS1.toString() + "|" + cpName1 +
+      "|" + Coprocessor.PRIORITY_USER);
+      // with configuration values
+    htd.setValue("COPROCESSOR$2", jarFileOnHDFS2.toString() + "|" + cpName2 +
+      "|" + Coprocessor.PRIORITY_USER + "|k1=v1,k2=v2,k3=v3");
+    Admin admin = TEST_UTIL.getHBaseAdmin();
+    if (admin.tableExists(tableName)) {
+      if (admin.isTableEnabled(tableName)) {
+        admin.disableTable(tableName);
+      }
+      admin.deleteTable(tableName);
+    }
+    CoprocessorClassLoader.clearCache();
+    byte[] startKey = {10, 63};
+    byte[] endKey = {12, 43};
+    admin.createTable(htd, startKey, endKey, 4);
+    waitForTable(htd.getTableName());
+
+    // verify that the coprocessors were loaded
+    boolean foundTableRegion=false;
+    boolean found1 = true, found2 = true, found2_k1 = true, found2_k2 = true, found2_k3 = true;
+    Map<Region, Set<ClassLoader>> regionsActiveClassLoaders =
+        new HashMap<Region, Set<ClassLoader>>();
+    MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
+    for (Region region:
+        hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
+      if (region.getRegionInfo().getRegionNameAsString().startsWith(tableName.getNameAsString())) {
+        foundTableRegion = true;
+        CoprocessorEnvironment env;
+        env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName1);
+        found1 = found1 && (env != null);
+        env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName2);
+        found2 = found2 && (env != null);
+        if (env != null) {
+          Configuration conf = env.getConfiguration();
+          found2_k1 = found2_k1 && (conf.get("k1") != null);
+          found2_k2 = found2_k2 && (conf.get("k2") != null);
+          found2_k3 = found2_k3 && (conf.get("k3") != null);
+        } else {
+          found2_k1 = found2_k2 = found2_k3 = false;
+        }
+        regionsActiveClassLoaders
+            .put(region, ((CoprocessorHost) region.getCoprocessorHost()).getExternalClassLoaders());
+      }
+    }
+
+    assertTrue("No region was found for table " + tableName, foundTableRegion);
+    assertTrue("Class " + cpName1 + " was missing on a region", found1);
+    assertTrue("Class " + cpName2 + " was missing on a region", found2);
+    assertTrue("Configuration key 'k1' was missing on a region", found2_k1);
+    assertTrue("Configuration key 'k2' was missing on a region", found2_k2);
+    assertTrue("Configuration key 'k3' was missing on a region", found2_k3);
+    // check if CP classloaders are cached
+    assertNotNull(jarFileOnHDFS1 + " was not cached",
+      CoprocessorClassLoader.getIfCached(pathOnHDFS1));
+    assertNotNull(jarFileOnHDFS2 + " was not cached",
+      CoprocessorClassLoader.getIfCached(pathOnHDFS2));
+    //two external jar used, should be one classloader per jar
+    assertEquals("The number of cached classloaders should be equal to the number" +
+      " of external jar files",
+      2, CoprocessorClassLoader.getAllCached().size());
+    //check if region active classloaders are shared across all RS regions
+    Set<ClassLoader> externalClassLoaders = new HashSet<ClassLoader>(
+      CoprocessorClassLoader.getAllCached());
+    for (Map.Entry<Region, Set<ClassLoader>> regionCP : regionsActiveClassLoaders.entrySet()) {
+      assertTrue("Some CP classloaders for region " + regionCP.getKey() + " are not cached."
+        + " ClassLoader Cache:" + externalClassLoaders
+        + " Region ClassLoaders:" + regionCP.getValue(),
+        externalClassLoaders.containsAll(regionCP.getValue()));
+    }
+  }
+
+  private String getLocalPath(File file) {
+    return new Path(file.toURI()).toString();
+  }
+
+  @Test
+  // HBASE-3516: Test CP Class loading from local file system
+  public void testClassLoadingFromLocalFS() throws Exception {
+    File jarFile = buildCoprocessorJar(cpName3);
+
+    // create a table that references the jar
+    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(cpName3));
+    htd.addFamily(new HColumnDescriptor("test"));
+    htd.setValue("COPROCESSOR$1", getLocalPath(jarFile) + "|" + cpName3 + "|" +
+      Coprocessor.PRIORITY_USER);
+    Admin admin = TEST_UTIL.getHBaseAdmin();
+    admin.createTable(htd);
+    waitForTable(htd.getTableName());
+
+    // verify that the coprocessor was loaded
+    boolean found = false;
+    MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
+    for (Region region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
+      if (region.getRegionInfo().getRegionNameAsString().startsWith(cpName3)) {
+        found = (region.getCoprocessorHost().findCoprocessor(cpName3) != null);
+      }
+    }
+    assertTrue("Class " + cpName3 + " was missing on a region", found);
+  }
+
+  @Test
+  // HBASE-6308: Test CP classloader is the CoprocessorClassLoader
+  public void testPrivateClassLoader() throws Exception {
+    File jarFile = buildCoprocessorJar(cpName4);
+
+    // create a table that references the jar
+    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(cpName4));
+    htd.addFamily(new HColumnDescriptor("test"));
+    htd.setValue("COPROCESSOR$1", getLocalPath(jarFile) + "|" + cpName4 + "|" +
+      Coprocessor.PRIORITY_USER);
+    Admin admin = TEST_UTIL.getHBaseAdmin();
+    admin.createTable(htd);
+    waitForTable(htd.getTableName());
+
+    // verify that the coprocessor was loaded correctly
+    boolean found = false;
+    MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
+    for (Region region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
+      if (region.getRegionInfo().getRegionNameAsString().startsWith(cpName4)) {
+        Coprocessor cp = region.getCoprocessorHost().findCoprocessor(cpName4);
+        if (cp != null) {
+          found = true;
+          assertEquals("Class " + cpName4 + " was not loaded by CoprocessorClassLoader",
+            cp.getClass().getClassLoader().getClass(), CoprocessorClassLoader.class);
+        }
+      }
+    }
+    assertTrue("Class " + cpName4 + " was missing on a region", found);
+  }
+
+  @Test
+  // HBase-3810: Registering a Coprocessor at HTableDescriptor should be
+  // less strict
+  public void testHBase3810() throws Exception {
+    // allowed value pattern: [path] | class name | [priority] | [key values]
+
+    File jarFile1 = buildCoprocessorJar(cpName1);
+    File jarFile2 = buildCoprocessorJar(cpName2);
+    File jarFile5 = buildCoprocessorJar(cpName5);
+    File jarFile6 = buildCoprocessorJar(cpName6);
+
+    String cpKey1 = "COPROCESSOR$1";
+    String cpKey2 = " Coprocessor$2 ";
+    String cpKey3 = " coprocessor$03 ";
+
+    String cpValue1 = getLocalPath(jarFile1) + "|" + cpName1 + "|" +
+        Coprocessor.PRIORITY_USER;
+    String cpValue2 = getLocalPath(jarFile2) + " | " + cpName2 + " | ";
+    // load from default class loader
+    String cpValue3 =
+        " | org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver | | k=v ";
+
+    // create a table that references the jar
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor("test"));
+
+    // add 3 coprocessors by setting htd attributes directly.
+    htd.setValue(cpKey1, cpValue1);
+    htd.setValue(cpKey2, cpValue2);
+    htd.setValue(cpKey3, cpValue3);
+
+    // add 2 coprocessor by using new htd.addCoprocessor() api
+    htd.addCoprocessor(cpName5, new Path(getLocalPath(jarFile5)),
+        Coprocessor.PRIORITY_USER, null);
+    Map<String, String> kvs = new HashMap<String, String>();
+    kvs.put("k1", "v1");
+    kvs.put("k2", "v2");
+    kvs.put("k3", "v3");
+    htd.addCoprocessor(cpName6, new Path(getLocalPath(jarFile6)),
+        Coprocessor.PRIORITY_USER, kvs);
+
+    Admin admin = TEST_UTIL.getHBaseAdmin();
+    if (admin.tableExists(tableName)) {
+      if (admin.isTableEnabled(tableName)) {
+        admin.disableTable(tableName);
+      }
+      admin.deleteTable(tableName);
+    }
+    admin.createTable(htd);
+    waitForTable(htd.getTableName());
+
+    // verify that the coprocessor was loaded
+    boolean found_2 = false, found_1 = false, found_3 = false,
+        found_5 = false, found_6 = false;
+    boolean found6_k1 = false, found6_k2 = false, found6_k3 = false,
+        found6_k4 = false;
+
+    MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
+    for (Region region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
+      if (region.getRegionInfo().getRegionNameAsString().startsWith(tableName.getNameAsString())) {
+        found_1 = found_1 ||
+            (region.getCoprocessorHost().findCoprocessor(cpName1) != null);
+        found_2 = found_2 ||
+            (region.getCoprocessorHost().findCoprocessor(cpName2) != null);
+        found_3 = found_3 ||
+            (region.getCoprocessorHost().findCoprocessor("SimpleRegionObserver")
+                != null);
+        found_5 = found_5 ||
+            (region.getCoprocessorHost().findCoprocessor(cpName5) != null);
+
+        CoprocessorEnvironment env =
+            region.getCoprocessorHost().findCoprocessorEnvironment(cpName6);
+        if (env != null) {
+          found_6 = true;
+          Configuration conf = env.getConfiguration();
+          found6_k1 = conf.get("k1") != null;
+          found6_k2 = conf.get("k2") != null;
+          found6_k3 = conf.get("k3") != null;
+        }
+      }
+    }
+
+    assertTrue("Class " + cpName1 + " was missing on a region", found_1);
+    assertTrue("Class " + cpName2 + " was missing on a region", found_2);
+    assertTrue("Class SimpleRegionObserver was missing on a region", found_3);
+    assertTrue("Class " + cpName5 + " was missing on a region", found_5);
+    assertTrue("Class " + cpName6 + " was missing on a region", found_6);
+
+    assertTrue("Configuration key 'k1' was missing on a region", found6_k1);
+    assertTrue("Configuration key 'k2' was missing on a region", found6_k2);
+    assertTrue("Configuration key 'k3' was missing on a region", found6_k3);
+    assertFalse("Configuration key 'k4' wasn't configured", found6_k4);
+  }
+
+  @Test
+  public void testClassLoadingFromLibDirInJar() throws Exception {
+    loadingClassFromLibDirInJar("/lib/");
+  }
+
+  @Test
+  public void testClassLoadingFromRelativeLibDirInJar() throws Exception {
+    loadingClassFromLibDirInJar("lib/");
+  }
+
+  void loadingClassFromLibDirInJar(String libPrefix) throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+
+    File innerJarFile1 = buildCoprocessorJar(cpName1);
+    File innerJarFile2 = buildCoprocessorJar(cpName2);
+    File outerJarFile = new File(TEST_UTIL.getDataTestDir().toString(), "outer.jar");
+
+    ClassLoaderTestHelper.addJarFilesToJar(
+      outerJarFile, libPrefix, innerJarFile1, innerJarFile2);
+
+    // copy the jars into dfs
+    fs.copyFromLocalFile(new Path(outerJarFile.getPath()),
+      new Path(fs.getUri().toString() + Path.SEPARATOR));
+    String jarFileOnHDFS = fs.getUri().toString() + Path.SEPARATOR +
+      outerJarFile.getName();
+    assertTrue("Copy jar file to HDFS failed.",
+      fs.exists(new Path(jarFileOnHDFS)));
+    LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS);
+
+    // create a table that references the coprocessors
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(new HColumnDescriptor("test"));
+      // without configuration values
+    htd.setValue("COPROCESSOR$1", jarFileOnHDFS.toString() + "|" + cpName1 +
+      "|" + Coprocessor.PRIORITY_USER);
+      // with configuration values
+    htd.setValue("COPROCESSOR$2", jarFileOnHDFS.toString() + "|" + cpName2 +
+      "|" + Coprocessor.PRIORITY_USER + "|k1=v1,k2=v2,k3=v3");
+    Admin admin = TEST_UTIL.getHBaseAdmin();
+    if (admin.tableExists(tableName)) {
+      if (admin.isTableEnabled(tableName)) {
+        admin.disableTable(tableName);
+      }
+      admin.deleteTable(tableName);
+    }
+    admin.createTable(htd);
+    waitForTable(htd.getTableName());
+
+    // verify that the coprocessors were loaded
+    boolean found1 = false, found2 = false, found2_k1 = false,
+        found2_k2 = false, found2_k3 = false;
+    MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
+    for (Region region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
+      if (region.getRegionInfo().getRegionNameAsString().startsWith(tableName.getNameAsString())) {
+        CoprocessorEnvironment env;
+        env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName1);
+        if (env != null) {
+          found1 = true;
+        }
+        env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName2);
+        if (env != null) {
+          found2 = true;
+          Configuration conf = env.getConfiguration();
+          found2_k1 = conf.get("k1") != null;
+          found2_k2 = conf.get("k2") != null;
+          found2_k3 = conf.get("k3") != null;
+        }
+      }
+    }
+    assertTrue("Class " + cpName1 + " was missing on a region", found1);
+    assertTrue("Class " + cpName2 + " was missing on a region", found2);
+    assertTrue("Configuration key 'k1' was missing on a region", found2_k1);
+    assertTrue("Configuration key 'k2' was missing on a region", found2_k2);
+    assertTrue("Configuration key 'k3' was missing on a region", found2_k3);
+  }
+
+  @Test
+  public void testRegionServerCoprocessorsReported() throws Exception {
+    // This was a test for HBASE-4070.
+    // We are removing coprocessors from region load in HBASE-5258.
+    // Therefore, this test now only checks system coprocessors.
+    assertAllRegionServers(null);
+  }
+
+  /**
+   * return the subset of all regionservers
+   * (actually returns set of ServerLoads)
+   * which host some region in a given table.
+   * used by assertAllRegionServers() below to
+   * test reporting of loaded coprocessors.
+   * @param tableName : given table.
+   * @return subset of all servers.
+   */
+  Map<ServerName, ServerLoad> serversForTable(String tableName) {
+    Map<ServerName, ServerLoad> serverLoadHashMap =
+        new HashMap<ServerName, ServerLoad>();
+    for(Map.Entry<ServerName,ServerLoad> server:
+        TEST_UTIL.getMiniHBaseCluster().getMaster().getServerManager().
+            getOnlineServers().entrySet()) {
+      for( Map.Entry<byte[], RegionLoad> region:
+          server.getValue().getRegionsLoad().entrySet()) {
+        if (region.getValue().getNameAsString().equals(tableName)) {
+          // this server hosts a region of tableName: add this server..
+          serverLoadHashMap.put(server.getKey(),server.getValue());
+          // .. and skip the rest of the regions that it hosts.
+          break;
+        }
+      }
+    }
+    return serverLoadHashMap;
+  }
+
+  void assertAllRegionServers(String tableName) throws InterruptedException {
+    Map<ServerName, ServerLoad> servers;
+    String[] actualCoprocessors = null;
+    boolean success = false;
+    String[] expectedCoprocessors = regionServerSystemCoprocessors;
+    if (tableName == null) {
+      // if no tableName specified, use all servers.
+      servers = TEST_UTIL.getMiniHBaseCluster().getMaster().getServerManager().getOnlineServers();
+    } else {
+      servers = serversForTable(tableName);
+    }
+    for (int i = 0; i < 5; i++) {
+      boolean any_failed = false;
+      for(Map.Entry<ServerName,ServerLoad> server: servers.entrySet()) {
+        actualCoprocessors = server.getValue().getRsCoprocessors();
+        if (!Arrays.equals(actualCoprocessors, expectedCoprocessors)) {
+          LOG.debug("failed comparison: actual: " +
+              Arrays.toString(actualCoprocessors) +
+              " ; expected: " + Arrays.toString(expectedCoprocessors));
+          any_failed = true;
+          expectedCoprocessors = switchExpectedCoprocessors(expectedCoprocessors);
+          break;
+        }
+        expectedCoprocessors = switchExpectedCoprocessors(expectedCoprocessors);
+      }
+      if (any_failed == false) {
+        success = true;
+        break;
+      }
+      LOG.debug("retrying after failed comparison: " + i);
+      Thread.sleep(1000);
+    }
+    assertTrue(success);
+  }
+
+  private String[] switchExpectedCoprocessors(String[] expectedCoprocessors) {
+    if (Arrays.equals(regionServerSystemCoprocessors, expectedCoprocessors)) {
+      expectedCoprocessors = masterRegionServerSystemCoprocessors;
+    } else {
+      expectedCoprocessors = regionServerSystemCoprocessors;
+    }
+    return expectedCoprocessors;
+  }
+
+  @Test
+  public void testMasterCoprocessorsReported() {
+    // HBASE 4070: Improve region server metrics to report loaded coprocessors
+    // to master: verify that the master is reporting the correct set of
+    // loaded coprocessors.
+    final String loadedMasterCoprocessorsVerify =
+        "[" + masterCoprocessor.getSimpleName() + "]";
+    String loadedMasterCoprocessors =
+        java.util.Arrays.toString(
+            TEST_UTIL.getHBaseCluster().getMaster().getMasterCoprocessors());
+    assertEquals(loadedMasterCoprocessorsVerify, loadedMasterCoprocessors);
+  }
+
+  @Test
+  public void testFindCoprocessors() {
+    // HBASE 12277: 
+    CoprocessorHost masterCpHost =
+                             TEST_UTIL.getHBaseCluster().getMaster().getMasterCoprocessorHost();
+
+    List<MasterObserver> masterObservers = masterCpHost.findCoprocessors(MasterObserver.class);
+
+    assertTrue(masterObservers != null && masterObservers.size() > 0);
+    assertEquals(masterCoprocessor.getSimpleName(),
+                 masterObservers.get(0).getClass().getSimpleName());
+  }
+
+  private void waitForTable(TableName name) throws InterruptedException, IOException {
+    // First wait until all regions are online
+    TEST_UTIL.waitTableEnabled(name);
+    // Now wait a bit longer for the coprocessor hosts to load the CPs
+    Thread.sleep(1000);
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
new file mode 100644
index 0000000..7e2577a
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
@@ -0,0 +1,349 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * TestEndpoint: test cases to verify coprocessor Endpoint
+ */
+@Category({CoprocessorTests.class, MediumTests.class})
+public class TestCoprocessorEndpoint {
+  private static final Log LOG = LogFactory.getLog(TestCoprocessorEndpoint.class);
+
+  private static final TableName TEST_TABLE =
+      TableName.valueOf("TestCoprocessorEndpoint");
+  private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
+  private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
+  private static byte[] ROW = Bytes.toBytes("testRow");
+
+  private static final int ROWSIZE = 20;
+  private static final int rowSeperator1 = 5;
+  private static final int rowSeperator2 = 12;
+  private static byte[][] ROWS = makeN(ROW, ROWSIZE);
+
+  private static HBaseTestingUtility util = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    // set configure to indicate which cp should be loaded
+    Configuration conf = util.getConfiguration();
+    conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 5000);
+    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+        org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
+        ProtobufCoprocessorService.class.getName());
+    conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+        ProtobufCoprocessorService.class.getName());
+    util.startMiniCluster(2);
+    Admin admin = util.getHBaseAdmin();
+    HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
+    desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
+    admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
+    util.waitUntilAllRegionsAssigned(TEST_TABLE);
+
+    Table table = util.getConnection().getTable(TEST_TABLE);
+    for (int i = 0; i < ROWSIZE; i++) {
+      Put put = new Put(ROWS[i]);
+      put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
+      table.put(put);
+    }
+    table.close();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  private Map<byte [], Long> sum(final Table table, final byte [] family,
+      final byte [] qualifier, final byte [] start, final byte [] end)
+  throws ServiceException, Throwable {
+    return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
+        start, end,
+      new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
+        @Override
+        public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
+        throws IOException {
+          CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
+              new CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse>();
+          ColumnAggregationProtos.SumRequest.Builder builder =
+            ColumnAggregationProtos.SumRequest.newBuilder();
+          builder.setFamily(ByteStringer.wrap(family));
+          if (qualifier != null && qualifier.length > 0) {
+            builder.setQualifier(ByteStringer.wrap(qualifier));
+          }
+          instance.sum(null, builder.build(), rpcCallback);
+          return rpcCallback.get().getSum();
+        }
+      });
+  }
+
+  @Test
+  public void testAggregation() throws Throwable {
+    Table table = util.getConnection().getTable(TEST_TABLE);
+    Map<byte[], Long> results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
+      ROWS[0], ROWS[ROWS.length-1]);
+    int sumResult = 0;
+    int expectedResult = 0;
+    for (Map.Entry<byte[], Long> e : results.entrySet()) {
+      LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
+      sumResult += e.getValue();
+    }
+    for (int i = 0; i < ROWSIZE; i++) {
+      expectedResult += i;
+    }
+    assertEquals("Invalid result", expectedResult, sumResult);
+
+    results.clear();
+
+    // scan: for region 2 and region 3
+    results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
+      ROWS[rowSeperator1], ROWS[ROWS.length-1]);
+    sumResult = 0;
+    expectedResult = 0;
+    for (Map.Entry<byte[], Long> e : results.entrySet()) {
+      LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
+      sumResult += e.getValue();
+    }
+    for (int i = rowSeperator1; i < ROWSIZE; i++) {
+      expectedResult += i;
+    }
+    assertEquals("Invalid result", expectedResult, sumResult);
+    table.close();
+  }
+
+  @Test
+  public void testCoprocessorService() throws Throwable {
+    Table table = util.getConnection().getTable(TEST_TABLE);
+
+    List<HRegionLocation> regions;
+    try(RegionLocator rl = util.getConnection().getRegionLocator(TEST_TABLE)) {
+      regions = rl.getAllRegionLocations();
+    }
+    final TestProtos.EchoRequestProto request =
+        TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
+    final Map<byte[], String> results = Collections.synchronizedMap(
+        new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR));
+    try {
+      // scan: for all regions
+      final RpcController controller = new ServerRpcController();
+      table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
+          ROWS[0], ROWS[ROWS.length - 1],
+          new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
+            public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
+                throws IOException {
+              LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
+              CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
+                  new CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto>();
+              instance.echo(controller, request, callback);
+              TestProtos.EchoResponseProto response = callback.get();
+              LOG.debug("Batch.Call returning result " + response);
+              return response;
+            }
+          },
+          new Batch.Callback<TestProtos.EchoResponseProto>() {
+            public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
+              assertNotNull(result);
+              assertEquals("hello", result.getMessage());
+              results.put(region, result.getMessage());
+            }
+          }
+      );
+      for (Map.Entry<byte[], String> e : results.entrySet()) {
+        LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
+      }
+      assertEquals(3, results.size());
+      for (HRegionLocation info : regions) {
+        LOG.info("Region info is "+info.getRegionInfo().getRegionNameAsString());
+        assertTrue(results.containsKey(info.getRegionInfo().getRegionName()));
+      }
+      results.clear();
+
+      // scan: for region 2 and region 3
+      table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
+          ROWS[rowSeperator1], ROWS[ROWS.length - 1],
+          new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
+            public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
+                throws IOException {
+              LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
+              CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
+                  new CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto>();
+              instance.echo(controller, request, callback);
+              TestProtos.EchoResponseProto response = callback.get();
+              LOG.debug("Batch.Call returning result " + response);
+              return response;
+            }
+          },
+          new Batch.Callback<TestProtos.EchoResponseProto>() {
+            public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
+              assertNotNull(result);
+              assertEquals("hello", result.getMessage());
+              results.put(region, result.getMessage());
+            }
+          }
+      );
+      for (Map.Entry<byte[], String> e : results.entrySet()) {
+        LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
+      }
+      assertEquals(2, results.size());
+    } finally {
+      table.close();
+    }
+  }
+
+  @Test
+  public void testCoprocessorServiceNullResponse() throws Throwable {
+    Table table = util.getConnection().getTable(TEST_TABLE);
+    List<HRegionLocation> regions;
+    try(RegionLocator rl = util.getConnection().getRegionLocator(TEST_TABLE)) {
+      regions = rl.getAllRegionLocations();
+    }
+
+    final TestProtos.EchoRequestProto request =
+        TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
+    try {
+      // scan: for all regions
+      final RpcController controller = new ServerRpcController();
+      // test that null results are supported
+      Map<byte[], String> results =
+            table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
+          ROWS[0], ROWS[ROWS.length - 1],
+          new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>() {
+            public String call(TestRpcServiceProtos.TestProtobufRpcProto instance)
+                throws IOException {
+              CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
+                  new CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto>();
+              instance.echo(controller, request, callback);
+              TestProtos.EchoResponseProto response = callback.get();
+              LOG.debug("Batch.Call got result " + response);
+              return null;
+            }
+          }
+      );
+      for (Map.Entry<byte[], String> e : results.entrySet()) {
+        LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
+      }
+      assertEquals(3, results.size());
+      for (HRegionLocation region : regions) {
+        HRegionInfo info = region.getRegionInfo();
+        LOG.info("Region info is "+info.getRegionNameAsString());
+        assertTrue(results.containsKey(info.getRegionName()));
+        assertNull(results.get(info.getRegionName()));
+      }
+    } finally {
+      table.close();
+    }
+  }
+
+  @Test
+  public void testMasterCoprocessorService() throws Throwable {
+    Admin admin = util.getHBaseAdmin();
+    final TestProtos.EchoRequestProto request =
+        TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
+    TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
+        TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
+    assertEquals("hello", service.echo(null, request).getMessage());
+  }
+
+  @Test
+  public void testCoprocessorError() throws Exception {
+    Configuration configuration = new Configuration(util.getConfiguration());
+    // Make it not retry forever
+    configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+    Table table = util.getConnection().getTable(TEST_TABLE);
+
+    try {
+      CoprocessorRpcChannel protocol = table.coprocessorService(ROWS[0]);
+
+      TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
+          TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(protocol);
+
+      service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
+      fail("Should have thrown an exception");
+    } catch (ServiceException e) {
+    } finally {
+      table.close();
+    }
+  }
+
+  @Test
+  public void testMasterCoprocessorError() throws Throwable {
+    Admin admin = util.getHBaseAdmin();
+    TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
+        TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
+    try {
+      service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
+      fail("Should have thrown an exception");
+    } catch (ServiceException e) {
+    }
+  }
+
+  private static byte[][] makeN(byte[] base, int n) {
+    byte[][] ret = new byte[n][];
+    for (int i = 0; i < n; i++) {
+      ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
+    }
+    return ret;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java
new file mode 100644
index 0000000..4913acf
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
+
+@Category({CoprocessorTests.class, MediumTests.class})
+public class TestCoprocessorTableEndpoint {
+
+  private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
+  private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
+  private static final byte[] ROW = Bytes.toBytes("testRow");
+  private static final int ROWSIZE = 20;
+  private static final int rowSeperator1 = 5;
+  private static final int rowSeperator2 = 12;
+  private static final byte[][] ROWS = makeN(ROW, ROWSIZE);
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testCoprocessorTableEndpoint() throws Throwable {    
+    final TableName tableName = TableName.valueOf("testCoprocessorTableEndpoint");
+
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
+    desc.addCoprocessor(org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName());
+
+    createTable(desc);
+    verifyTable(tableName);
+  }
+
+  @Test
+  public void testDynamicCoprocessorTableEndpoint() throws Throwable {    
+    final TableName tableName = TableName.valueOf("testDynamicCoprocessorTableEndpoint");
+
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
+
+    createTable(desc);
+
+    desc.addCoprocessor(org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName());
+    updateTable(desc);
+
+    verifyTable(tableName);
+  }
+
+  private static byte[][] makeN(byte[] base, int n) {
+    byte[][] ret = new byte[n][];
+    for (int i = 0; i < n; i++) {
+      ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
+    }
+    return ret;
+  }
+
+  private static Map<byte [], Long> sum(final Table table, final byte [] family,
+    final byte [] qualifier, final byte [] start, final byte [] end)
+      throws ServiceException, Throwable {
+  return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
+      start, end,
+    new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
+      @Override
+      public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
+      throws IOException {
+        CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
+            new CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse>();
+        ColumnAggregationProtos.SumRequest.Builder builder =
+          ColumnAggregationProtos.SumRequest.newBuilder();
+        builder.setFamily(ByteString.copyFrom(family));
+        if (qualifier != null && qualifier.length > 0) {
+          builder.setQualifier(ByteString.copyFrom(qualifier));
+        }
+        instance.sum(null, builder.build(), rpcCallback);
+        return rpcCallback.get().getSum();
+      }
+    });
+  }
+
+  private static final void createTable(HTableDescriptor desc) throws Exception {
+    Admin admin = TEST_UTIL.getHBaseAdmin();
+    admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
+    TEST_UTIL.waitUntilAllRegionsAssigned(desc.getTableName());
+    Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
+    try {
+      for (int i = 0; i < ROWSIZE; i++) {
+        Put put = new Put(ROWS[i]);
+        put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
+        table.put(put);
+      }
+    } finally {
+      table.close();    
+    }
+  }
+
+  private static void updateTable(HTableDescriptor desc) throws Exception {
+    Admin admin = TEST_UTIL.getHBaseAdmin();
+    admin.disableTable(desc.getTableName());
+    admin.modifyTable(desc.getTableName(), desc);
+    admin.enableTable(desc.getTableName());
+  }
+
+  private static final void verifyTable(TableName tableName) throws Throwable {
+    Table table = TEST_UTIL.getConnection().getTable(tableName);
+    try {
+      Map<byte[], Long> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0],
+        ROWS[ROWS.length-1]);
+      int sumResult = 0;
+      int expectedResult = 0;
+      for (Map.Entry<byte[], Long> e : results.entrySet()) {
+        sumResult += e.getValue();
+      }
+      for (int i = 0; i < ROWSIZE; i++) {
+        expectedResult += i;
+      }
+      assertEquals("Invalid result", expectedResult, sumResult);
+
+      // scan: for region 2 and region 3
+      results.clear();
+      results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1], ROWS[ROWS.length-1]);
+      sumResult = 0;
+      expectedResult = 0;
+      for (Map.Entry<byte[], Long> e : results.entrySet()) {
+        sumResult += e.getValue();
+      }
+      for (int i = rowSeperator1; i < ROWSIZE; i++) {
+        expectedResult += i;
+      }
+      assertEquals("Invalid result", expectedResult, sumResult);
+    } finally {
+      table.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java
new file mode 100644
index 0000000..31646f8
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
+@Category({CoprocessorTests.class, MediumTests.class})
+public class TestRegionServerCoprocessorEndpoint {
+  public static final FileNotFoundException WHAT_TO_THROW = new FileNotFoundException("/file.txt");
+  private static HBaseTestingUtility TEST_UTIL = null;
+  private static Configuration CONF = null;
+  private static final String DUMMY_VALUE = "val";
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility();
+    CONF = TEST_UTIL.getConfiguration();
+    CONF.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
+      DummyRegionServerEndpoint.class.getName());
+    TEST_UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testEndpoint() throws Exception {
+    final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
+    final ServerRpcController controller = new ServerRpcController();
+    final CoprocessorRpcUtils.BlockingRpcCallback<DummyRegionServerEndpointProtos.DummyResponse>
+        rpcCallback =
+      new CoprocessorRpcUtils.BlockingRpcCallback<DummyRegionServerEndpointProtos.DummyResponse>();
+    DummyRegionServerEndpointProtos.DummyService service =
+        ProtobufUtil.newServiceStub(DummyRegionServerEndpointProtos.DummyService.class,
+          TEST_UTIL.getHBaseAdmin().coprocessorService(serverName));
+    service.dummyCall(controller,
+        DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(), rpcCallback);
+    assertEquals(DUMMY_VALUE, rpcCallback.get().getValue());
+    if (controller.failedOnException()) {
+      throw controller.getFailedOn();
+    }
+  }
+
+  @Test
+  public void testEndpointExceptions() throws Exception {
+    final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
+    final ServerRpcController controller = new ServerRpcController();
+    final CoprocessorRpcUtils.BlockingRpcCallback<DummyRegionServerEndpointProtos.DummyResponse>
+        rpcCallback =
+      new CoprocessorRpcUtils.BlockingRpcCallback<DummyRegionServerEndpointProtos.DummyResponse>();
+    DummyRegionServerEndpointProtos.DummyService service =
+        ProtobufUtil.newServiceStub(DummyRegionServerEndpointProtos.DummyService.class,
+            TEST_UTIL.getHBaseAdmin().coprocessorService(serverName));
+    service.dummyThrow(controller,
+        DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(), rpcCallback);
+    assertEquals(null, rpcCallback.get());
+    assertTrue(controller.failedOnException());
+    assertEquals(WHAT_TO_THROW.getClass().getName().trim(),
+        ((RemoteWithExtrasException) controller.getFailedOn().getCause()).getClassName().trim());
+  }
+
+  static class DummyRegionServerEndpoint extends DummyService implements Coprocessor, SingletonCoprocessorService {
+
+    @Override
+    public Service getService() {
+      return this;
+    }
+
+    @Override
+    public void start(CoprocessorEnvironment env) throws IOException {
+      // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void stop(CoprocessorEnvironment env) throws IOException {
+      // TODO Auto-generated method stub
+    }
+
+    @Override
+    public void dummyCall(RpcController controller, DummyRequest request,
+        RpcCallback<DummyResponse> callback) {
+      callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build());
+    }
+
+    @Override
+    public void dummyThrow(RpcController controller,
+        DummyRequest request,
+        RpcCallback<DummyResponse> done) {
+      CoprocessorRpcUtils.setControllerException(controller, WHAT_TO_THROW);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
new file mode 100644
index 0000000..7cae0bc
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
@@ -0,0 +1,665 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.IsolationLevel;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorResponse;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService;
+import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.protobuf.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Verifies ProcessEndpoint works.
+ * The tested RowProcessor performs two scans and a read-modify-write.
+ */
+@Category({CoprocessorTests.class, MediumTests.class})
+public class TestRowProcessorEndpoint {
+
+  private static final Log LOG = LogFactory.getLog(TestRowProcessorEndpoint.class);
+
+  private static final TableName TABLE = TableName.valueOf("testtable");
+  private final static byte[] ROW = Bytes.toBytes("testrow");
+  private final static byte[] ROW2 = Bytes.toBytes("testrow2");
+  private final static byte[] FAM = Bytes.toBytes("friendlist");
+
+  // Column names
+  private final static byte[] A = Bytes.toBytes("a");
+  private final static byte[] B = Bytes.toBytes("b");
+  private final static byte[] C = Bytes.toBytes("c");
+  private final static byte[] D = Bytes.toBytes("d");
+  private final static byte[] E = Bytes.toBytes("e");
+  private final static byte[] F = Bytes.toBytes("f");
+  private final static byte[] G = Bytes.toBytes("g");
+  private final static byte[] COUNTER = Bytes.toBytes("counter");
+  private final static AtomicLong myTimer = new AtomicLong(0);
+  private final AtomicInteger failures = new AtomicInteger(0);
+
+  private static HBaseTestingUtility util = new HBaseTestingUtility();
+  private static volatile int expectedCounter = 0;
+  private static int rowSize, row2Size;
+
+  private volatile static Table table = null;
+  private volatile static boolean swapped = false;
+  private volatile CountDownLatch startSignal;
+  private volatile CountDownLatch doneSignal;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    Configuration conf = util.getConfiguration();
+    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+        RowProcessorEndpoint.class.getName());
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+    conf.setLong("hbase.hregion.row.processor.timeout", 1000L);
+    conf.setLong(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 2048);
+    util.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  public void prepareTestData() throws Exception {
+    try {
+      util.getHBaseAdmin().disableTable(TABLE);
+      util.getHBaseAdmin().deleteTable(TABLE);
+    } catch (Exception e) {
+      // ignore table not found
+    }
+    table = util.createTable(TABLE, FAM);
+    {
+      Put put = new Put(ROW);
+      put.addColumn(FAM, A, Bytes.add(B, C));    // B, C are friends of A
+      put.addColumn(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B
+      put.addColumn(FAM, C, G);                  // G is a friend of C
+      table.put(put);
+      rowSize = put.size();
+    }
+    Put put = new Put(ROW2);
+    put.addColumn(FAM, D, E);
+    put.addColumn(FAM, F, G);
+    table.put(put);
+    row2Size = put.size();
+  }
+
+  @Test
+  public void testDoubleScan() throws Throwable {
+    prepareTestData();
+
+    CoprocessorRpcChannel channel = table.coprocessorService(ROW);
+    RowProcessorEndpoint.FriendsOfFriendsProcessor processor =
+        new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A);
+    RowProcessorService.BlockingInterface service =
+        RowProcessorService.newBlockingStub(channel);
+    ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
+    ProcessResponse protoResult = service.process(null, request);
+    FriendsOfFriendsProcessorResponse response =
+        FriendsOfFriendsProcessorResponse.parseFrom(protoResult.getRowProcessorResult());
+    Set<String> result = new HashSet<String>();
+    result.addAll(response.getResultList());
+    Set<String> expected =
+      new HashSet<String>(Arrays.asList(new String[]{"d", "e", "f", "g"}));
+    Get get = new Get(ROW);
+    LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells()));
+    assertEquals(expected, result);
+  }
+
+  @Test
+  public void testReadModifyWrite() throws Throwable {
+    prepareTestData();
+    failures.set(0);
+    int numThreads = 100;
+    concurrentExec(new IncrementRunner(), numThreads);
+    Get get = new Get(ROW);
+    LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells()));
+    int finalCounter = incrementCounter(table);
+    assertEquals(numThreads + 1, finalCounter);
+    assertEquals(0, failures.get());
+  }
+
+  class IncrementRunner implements Runnable {
+    @Override
+    public void run() {
+      try {
+        incrementCounter(table);
+      } catch (Throwable e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  private int incrementCounter(Table table) throws Throwable {
+    CoprocessorRpcChannel channel = table.coprocessorService(ROW);
+    RowProcessorEndpoint.IncrementCounterProcessor processor =
+        new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
+    RowProcessorService.BlockingInterface service =
+        RowProcessorService.newBlockingStub(channel);
+    ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
+    ProcessResponse protoResult = service.process(null, request);
+    IncCounterProcessorResponse response = IncCounterProcessorResponse
+        .parseFrom(protoResult.getRowProcessorResult());
+    Integer result = response.getResponse();
+    return result;
+  }
+
+  private void concurrentExec(
+      final Runnable task, final int numThreads) throws Throwable {
+    startSignal = new CountDownLatch(numThreads);
+    doneSignal = new CountDownLatch(numThreads);
+    for (int i = 0; i < numThreads; ++i) {
+      new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            startSignal.countDown();
+            startSignal.await();
+            task.run();
+          } catch (Throwable e) {
+            failures.incrementAndGet();
+            e.printStackTrace();
+          }
+          doneSignal.countDown();
+        }
+      }).start();
+    }
+    doneSignal.await();
+  }
+
+  @Test
+  public void testMultipleRows() throws Throwable {
+    prepareTestData();
+    failures.set(0);
+    int numThreads = 100;
+    concurrentExec(new SwapRowsRunner(), numThreads);
+    LOG.debug("row keyvalues:" +
+              stringifyKvs(table.get(new Get(ROW)).listCells()));
+    LOG.debug("row2 keyvalues:" +
+              stringifyKvs(table.get(new Get(ROW2)).listCells()));
+    assertEquals(rowSize, table.get(new Get(ROW)).listCells().size());
+    assertEquals(row2Size, table.get(new Get(ROW2)).listCells().size());
+    assertEquals(0, failures.get());
+  }
+
+  class SwapRowsRunner implements Runnable {
+    @Override
+    public void run() {
+      try {
+        swapRows(table);
+      } catch (Throwable e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  private void swapRows(Table table) throws Throwable {
+    CoprocessorRpcChannel channel = table.coprocessorService(ROW);
+    RowProcessorEndpoint.RowSwapProcessor processor =
+        new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2);
+    RowProcessorService.BlockingInterface service =
+        RowProcessorService.newBlockingStub(channel);
+    ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
+    service.process(null, request);
+  }
+
+  @Test
+  public void testTimeout() throws Throwable {
+    prepareTestData();
+    CoprocessorRpcChannel channel = table.coprocessorService(ROW);
+    RowProcessorEndpoint.TimeoutProcessor processor =
+        new RowProcessorEndpoint.TimeoutProcessor(ROW);
+    RowProcessorService.BlockingInterface service =
+        RowProcessorService.newBlockingStub(channel);
+    ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
+    boolean exceptionCaught = false;
+    try {
+      service.process(null, request);
+    } catch (Exception e) {
+      exceptionCaught = true;
+    }
+    assertTrue(exceptionCaught);
+  }
+
+  /**
+   * This class defines two RowProcessors:
+   * IncrementCounterProcessor and FriendsOfFriendsProcessor.
+   *
+   * We define the RowProcessors as the inner class of the endpoint.
+   * So they can be loaded with the endpoint on the coprocessor.
+   */
+  public static class RowProcessorEndpoint<S extends Message,T extends Message>
+  extends BaseRowProcessorEndpoint<S,T> implements CoprocessorService {
+    public static class IncrementCounterProcessor extends
+        BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest,
+        IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> {
+      int counter = 0;
+      byte[] row = new byte[0];
+
+      /**
+       * Empty constructor for Writable
+       */
+      IncrementCounterProcessor() {
+      }
+
+      IncrementCounterProcessor(byte[] row) {
+        this.row = row;
+      }
+
+      @Override
+      public Collection<byte[]> getRowsToLock() {
+        return Collections.singleton(row);
+      }
+
+      @Override
+      public IncCounterProcessorResponse getResult() {
+        IncCounterProcessorResponse.Builder i = IncCounterProcessorResponse.newBuilder();
+        i.setResponse(counter);
+        return i.build();
+      }
+
+      @Override
+      public boolean readOnly() {
+        return false;
+      }
+
+      @Override
+      public void process(long now, HRegion region,
+          List<Mutation> mutations, WALEdit walEdit) throws IOException {
+        // Scan current counter
+        List<Cell> kvs = new ArrayList<Cell>();
+        Scan scan = new Scan(row, row);
+        scan.addColumn(FAM, COUNTER);
+        doScan(region, scan, kvs);
+        counter = kvs.size() == 0 ? 0 :
+          Bytes.toInt(CellUtil.cloneValue(kvs.iterator().next()));
+
+        // Assert counter value
+        assertEquals(expectedCounter, counter);
+
+        // Increment counter and send it to both memstore and wal edit
+        counter += 1;
+        expectedCounter += 1;
+
+
+        Put p = new Put(row);
+        KeyValue kv =
+            new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter));
+        p.add(kv);
+        mutations.add(p);
+        walEdit.add(kv);
+
+        // We can also inject some meta data to the walEdit
+        KeyValue metaKv = new KeyValue(
+            row, WALEdit.METAFAMILY,
+            Bytes.toBytes("I just increment counter"),
+            Bytes.toBytes(counter));
+        walEdit.add(metaKv);
+      }
+
+      @Override
+      public IncCounterProcessorRequest getRequestData() throws IOException {
+        IncCounterProcessorRequest.Builder builder = IncCounterProcessorRequest.newBuilder();
+        builder.setCounter(counter);
+        builder.setRow(ByteStringer.wrap(row));
+        return builder.build();
+      }
+
+      @Override
+      public void initialize(IncCounterProcessorRequest msg) {
+        this.row = msg.getRow().toByteArray();
+        this.counter = msg.getCounter();
+      }
+    }
+
+    public static class FriendsOfFriendsProcessor extends
+        BaseRowProcessor<FriendsOfFriendsProcessorRequest, FriendsOfFriendsProcessorResponse> {
+      byte[] row = null;
+      byte[] person = null;
+      final Set<String> result = new HashSet<String>();
+
+      /**
+       * Empty constructor for Writable
+       */
+      FriendsOfFriendsProcessor() {
+      }
+
+      FriendsOfFriendsProcessor(byte[] row, byte[] person) {
+        this.row = row;
+        this.person = person;
+      }
+
+      @Override
+      public Collection<byte[]> getRowsToLock() {
+        return Collections.singleton(row);
+      }
+
+      @Override
+      public FriendsOfFriendsProcessorResponse getResult() {
+        FriendsOfFriendsProcessorResponse.Builder builder = 
+            FriendsOfFriendsProcessorResponse.newBuilder();
+        builder.addAllResult(result);
+        return builder.build();
+      }
+
+      @Override
+      public boolean readOnly() {
+        return true;
+      }
+
+      @Override
+      public void process(long now, HRegion region,
+          List<Mutation> mutations, WALEdit walEdit) throws IOException {
+        List<Cell> kvs = new ArrayList<Cell>();
+        { // First scan to get friends of the person
+          Scan scan = new Scan(row, row);
+          scan.addColumn(FAM, person);
+          doScan(region, scan, kvs);
+        }
+
+        // Second scan to get friends of friends
+        Scan scan = new Scan(row, row);
+        for (Cell kv : kvs) {
+          byte[] friends = CellUtil.cloneValue(kv);
+          for (byte f : friends) {
+            scan.addColumn(FAM, new byte[]{f});
+          }
+        }
+        doScan(region, scan, kvs);
+
+        // Collect result
+        result.clear();
+        for (Cell kv : kvs) {
+          for (byte b : CellUtil.cloneValue(kv)) {
+            result.add((char)b + "");
+          }
+        }
+      }
+
+      @Override
+      public FriendsOfFriendsProcessorRequest getRequestData() throws IOException {
+        FriendsOfFriendsProcessorRequest.Builder builder =
+            FriendsOfFriendsProcessorRequest.newBuilder();
+        builder.setPerson(ByteStringer.wrap(person));
+        builder.setRow(ByteStringer.wrap(row));
+        builder.addAllResult(result);
+        FriendsOfFriendsProcessorRequest f = builder.build();
+        return f;
+      }
+
+      @Override
+      public void initialize(FriendsOfFriendsProcessorRequest request) 
+          throws IOException {
+        this.person = request.getPerson().toByteArray();
+        this.row = request.getRow().toByteArray();
+        result.clear();
+        result.addAll(request.getResultList());
+      }
+    }
+
+    public static class RowSwapProcessor extends
+        BaseRowProcessor<RowSwapProcessorRequest, RowSwapProcessorResponse> {
+      byte[] row1 = new byte[0];
+      byte[] row2 = new byte[0];
+
+      /**
+       * Empty constructor for Writable
+       */
+      RowSwapProcessor() {
+      }
+
+      RowSwapProcessor(byte[] row1, byte[] row2) {
+        this.row1 = row1;
+        this.row2 = row2;
+      }
+
+      @Override
+      public Collection<byte[]> getRowsToLock() {
+        List<byte[]> rows = new ArrayList<byte[]>();
+        rows.add(row1);
+        rows.add(row2);
+        return rows;
+      }
+
+      @Override
+      public boolean readOnly() {
+        return false;
+      }
+
+      @Override
+      public RowSwapProcessorResponse getResult() {
+        return RowSwapProcessorResponse.getDefaultInstance();
+      }
+
+      @Override
+      public void process(long now, HRegion region,
+          List<Mutation> mutations, WALEdit walEdit) throws IOException {
+
+        // Override the time to avoid race-condition in the unit test caused by
+        // inacurate timer on some machines
+        now = myTimer.getAndIncrement();
+
+        // Scan both rows
+        List<Cell> kvs1 = new ArrayList<Cell>();
+        List<Cell> kvs2 = new ArrayList<Cell>();
+        doScan(region, new Scan(row1, row1), kvs1);
+        doScan(region, new Scan(row2, row2), kvs2);
+
+        // Assert swapped
+        if (swapped) {
+          assertEquals(rowSize, kvs2.size());
+          assertEquals(row2Size, kvs1.size());
+        } else {
+          assertEquals(rowSize, kvs1.size());
+          assertEquals(row2Size, kvs2.size());
+        }
+        swapped = !swapped;
+
+        // Add and delete keyvalues
+        List<List<Cell>> kvs = new ArrayList<List<Cell>>();
+        kvs.add(kvs1);
+        kvs.add(kvs2);
+        byte[][] rows = new byte[][]{row1, row2};
+        for (int i = 0; i < kvs.size(); ++i) {
+          for (Cell kv : kvs.get(i)) {
+            // Delete from the current row and add to the other row
+            Delete d = new Delete(rows[i]);
+            KeyValue kvDelete =
+                new KeyValue(rows[i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), 
+                    kv.getTimestamp(), KeyValue.Type.Delete);
+            d.addDeleteMarker(kvDelete);
+            Put p = new Put(rows[1 - i]);
+            KeyValue kvAdd =
+                new KeyValue(rows[1 - i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
+                    now, CellUtil.cloneValue(kv));
+            p.add(kvAdd);
+            mutations.add(d);
+            walEdit.add(kvDelete);
+            mutations.add(p);
+            walEdit.add(kvAdd);
+          }
+        }
+      }
+
+      @Override
+      public String getName() {
+        return "swap";
+      }
+
+      @Override
+      public RowSwapProcessorRequest getRequestData() throws IOException {
+        RowSwapProcessorRequest.Builder builder = RowSwapProcessorRequest.newBuilder();
+        builder.setRow1(ByteStringer.wrap(row1));
+        builder.setRow2(ByteStringer.wrap(row2));
+        return builder.build();
+      }
+
+      @Override
+      public void initialize(RowSwapProcessorRequest msg) {
+        this.row1 = msg.getRow1().toByteArray();
+        this.row2 = msg.getRow2().toByteArray();
+      }
+    }
+
+    public static class TimeoutProcessor extends
+        BaseRowProcessor<TimeoutProcessorRequest, TimeoutProcessorResponse> {
+
+      byte[] row = new byte[0];
+
+      /**
+       * Empty constructor for Writable
+       */
+      public TimeoutProcessor() {
+      }
+
+      public TimeoutProcessor(byte[] row) {
+        this.row = row;
+      }
+
+      public Collection<byte[]> getRowsToLock() {
+        return Collections.singleton(row);
+      }
+
+      @Override
+      public TimeoutProcessorResponse getResult() {
+        return TimeoutProcessorResponse.getDefaultInstance();
+      }
+
+      @Override
+      public void process(long now, HRegion region,
+          List<Mutation> mutations, WALEdit walEdit) throws IOException {
+        try {
+          // Sleep for a long time so it timeout
+          Thread.sleep(100 * 1000L);
+        } catch (Exception e) {
+          throw new IOException(e);
+        }
+      }
+
+      @Override
+      public boolean readOnly() {
+        return true;
+      }
+
+      @Override
+      public String getName() {
+        return "timeout";
+      }
+
+      @Override
+      public TimeoutProcessorRequest getRequestData() throws IOException {
+        TimeoutProcessorRequest.Builder builder = TimeoutProcessorRequest.newBuilder();
+        builder.setRow(ByteStringer.wrap(row));
+        return builder.build();
+      }
+
+      @Override
+      public void initialize(TimeoutProcessorRequest msg) throws IOException {
+        this.row = msg.getRow().toByteArray();
+      }
+    }
+
+    public static void doScan(
+        HRegion region, Scan scan, List<Cell> result) throws IOException {
+      InternalScanner scanner = null;
+      try {
+        scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
+        scanner = region.getScanner(scan);
+        result.clear();
+        scanner.next(result);
+      } finally {
+        if (scanner != null) scanner.close();
+      }
+    }
+  }
+
+  static String stringifyKvs(Collection<Cell> kvs) {
+    StringBuilder out = new StringBuilder();
+    out.append("[");
+    if (kvs != null) {
+      for (Cell kv : kvs) {
+        byte[] col = CellUtil.cloneQualifier(kv);
+        byte[] val = CellUtil.cloneValue(kv);
+        if (Bytes.equals(col, COUNTER)) {
+          out.append(Bytes.toStringBinary(col) + ":" +
+                     Bytes.toInt(val) + " ");
+        } else {
+          out.append(Bytes.toStringBinary(col) + ":" +
+                     Bytes.toStringBinary(val) + " ");
+        }
+      }
+    }
+    out.append("]");
+    return out.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/ipc/TestCoprocessorRpcUtils.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/ipc/TestCoprocessorRpcUtils.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/ipc/TestCoprocessorRpcUtils.java
new file mode 100644
index 0000000..15a2747
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/ipc/TestCoprocessorRpcUtils.java
@@ -0,0 +1,45 @@
+/*
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.protobuf.Descriptors;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestCoprocessorRpcUtils {
+  @Test
+  public void testServiceName() throws Exception {
+    // verify that we de-namespace build in HBase rpc services
+    Descriptors.ServiceDescriptor authService =
+        AuthenticationProtos.AuthenticationService.getDescriptor();
+    assertEquals(authService.getName(), CoprocessorRpcUtils.getServiceName(authService));
+
+    // non-hbase rpc services should remain fully qualified
+    Descriptors.ServiceDescriptor dummyService =
+        DummyRegionServerEndpointProtos.DummyService.getDescriptor();
+    assertEquals(dummyService.getFullName(), CoprocessorRpcUtils.getServiceName(dummyService));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java
new file mode 100644
index 0000000..a82900d
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Client proxy for SecureBulkLoadProtocol used in conjunction with SecureBulkLoadEndpoint
+ * @deprecated Use for backward compatibility testing only. Will be removed when
+ *             SecureBulkLoadEndpoint is not supported.
+ */
+@InterfaceAudience.Private
+public class SecureBulkLoadEndpointClient {
+  private Table table;
+
+  public SecureBulkLoadEndpointClient(Table table) {
+    this.table = table;
+  }
+
+  public String prepareBulkLoad(final TableName tableName) throws IOException {
+    try {
+      CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
+      SecureBulkLoadProtos.SecureBulkLoadService instance =
+          ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
+
+      ServerRpcController controller = new ServerRpcController();
+
+      CoprocessorRpcUtils.BlockingRpcCallback<PrepareBulkLoadResponse> rpcCallback =
+          new CoprocessorRpcUtils.BlockingRpcCallback<PrepareBulkLoadResponse>();
+
+      PrepareBulkLoadRequest request =
+          PrepareBulkLoadRequest.newBuilder()
+          .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
+
+      instance.prepareBulkLoad(controller, request, rpcCallback);
+
+      PrepareBulkLoadResponse response = rpcCallback.get();
+      if (controller.failedOnException()) {
+        throw controller.getFailedOn();
+      }
+
+      return response.getBulkToken();
+    } catch (Throwable throwable) {
+      throw new IOException(throwable);
+    }
+  }
+
+  public void cleanupBulkLoad(final String bulkToken) throws IOException {
+    try {
+      CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
+      SecureBulkLoadProtos.SecureBulkLoadService instance =
+          ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
+
+      ServerRpcController controller = new ServerRpcController();
+
+      CoprocessorRpcUtils.BlockingRpcCallback<CleanupBulkLoadResponse> rpcCallback =
+          new CoprocessorRpcUtils.BlockingRpcCallback<CleanupBulkLoadResponse>();
+
+      CleanupBulkLoadRequest request =
+          CleanupBulkLoadRequest.newBuilder()
+              .setBulkToken(bulkToken).build();
+
+      instance.cleanupBulkLoad(controller,
+          request,
+          rpcCallback);
+
+      if (controller.failedOnException()) {
+        throw controller.getFailedOn();
+      }
+    } catch (Throwable throwable) {
+      throw new IOException(throwable);
+    }
+  }
+
+  public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
+                         final Token<?> userToken,
+                         final String bulkToken,
+                         final byte[] startRow) throws IOException {
+    // we never want to send a batch of HFiles to all regions, thus cannot call
+    // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
+    try {
+      CoprocessorRpcChannel channel = table.coprocessorService(startRow);
+      SecureBulkLoadProtos.SecureBulkLoadService instance =
+          ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
+
+      DelegationToken protoDT =
+          DelegationToken.newBuilder().build();
+      if(userToken != null) {
+        protoDT =
+            DelegationToken.newBuilder()
+              .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
+              .setPassword(ByteStringer.wrap(userToken.getPassword()))
+              .setKind(userToken.getKind().toString())
+              .setService(userToken.getService().toString()).build();
+      }
+
+      List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
+          new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>();
+      for(Pair<byte[], String> el: familyPaths) {
+        protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
+          .setFamily(ByteStringer.wrap(el.getFirst()))
+          .setPath(el.getSecond()).build());
+      }
+
+      SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
+          SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
+            .setFsToken(protoDT)
+            .addAllFamilyPath(protoFamilyPaths)
+            .setBulkToken(bulkToken).build();
+
+      ServerRpcController controller = new ServerRpcController();
+      CoprocessorRpcUtils.BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>
+            rpcCallback =
+          new CoprocessorRpcUtils.BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>();
+      instance.secureBulkLoadHFiles(controller,
+        request,
+        rpcCallback);
+
+      SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
+      if (controller.failedOnException()) {
+        throw controller.getFailedOn();
+      }
+      return response.getLoaded();
+    } catch (Throwable throwable) {
+      throw new IOException(throwable);
+    }
+  }
+
+}