You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/10/07 19:14:58 UTC
[30/77] [abbrv] [partial] hbase git commit: HBASE-15638 Shade
protobuf Which includes
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
new file mode 100644
index 0000000..f5d2a20
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestClassLoading.java
@@ -0,0 +1,563 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.TestServerCustomProtocol;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.ClassLoaderTestHelper;
+import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.RegionLoad;
+
+import java.io.*;
+import java.util.*;
+
+import org.junit.*;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Test coprocessors class loading.
+ */
+@Category({CoprocessorTests.class, MediumTests.class})
+public class TestClassLoading {
+ private static final Log LOG = LogFactory.getLog(TestClassLoading.class);
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static MiniDFSCluster cluster;
+
+ static final TableName tableName = TableName.valueOf("TestClassLoading");
+ static final String cpName1 = "TestCP1";
+ static final String cpName2 = "TestCP2";
+ static final String cpName3 = "TestCP3";
+ static final String cpName4 = "TestCP4";
+ static final String cpName5 = "TestCP5";
+ static final String cpName6 = "TestCP6";
+
+ private static Class<?> regionCoprocessor1 = ColumnAggregationEndpoint.class;
+ // TOOD: Fix the import of this handler. It is coming in from a package that is far away.
+ private static Class<?> regionCoprocessor2 = TestServerCustomProtocol.PingHandler.class;
+ private static Class<?> regionServerCoprocessor = SampleRegionWALObserver.class;
+ private static Class<?> masterCoprocessor = BaseMasterObserver.class;
+
+ private static final String[] regionServerSystemCoprocessors =
+ new String[]{
+ regionServerCoprocessor.getSimpleName()
+ };
+
+ private static final String[] masterRegionServerSystemCoprocessors = new String[] {
+ regionCoprocessor1.getSimpleName(), MultiRowMutationEndpoint.class.getSimpleName(),
+ regionServerCoprocessor.getSimpleName() };
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+
+ // regionCoprocessor1 will be loaded on all regionservers, since it is
+ // loaded for any tables (user or meta).
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ regionCoprocessor1.getName());
+
+ // regionCoprocessor2 will be loaded only on regionservers that serve a
+ // user table region. Therefore, if there are no user tables loaded,
+ // this coprocessor will not be loaded on any regionserver.
+ conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
+ regionCoprocessor2.getName());
+
+ conf.setStrings(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+ regionServerCoprocessor.getName());
+ conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+ masterCoprocessor.getName());
+ TEST_UTIL.startMiniCluster(1);
+ cluster = TEST_UTIL.getDFSCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ static File buildCoprocessorJar(String className) throws Exception {
+ String code = "import org.apache.hadoop.hbase.coprocessor.*;" +
+ "public class " + className + " extends BaseRegionObserver {}";
+ return ClassLoaderTestHelper.buildJar(
+ TEST_UTIL.getDataTestDir().toString(), className, code);
+ }
+
+ @Test
+ // HBASE-3516: Test CP Class loading from HDFS
+ public void testClassLoadingFromHDFS() throws Exception {
+ FileSystem fs = cluster.getFileSystem();
+
+ File jarFile1 = buildCoprocessorJar(cpName1);
+ File jarFile2 = buildCoprocessorJar(cpName2);
+
+ // copy the jars into dfs
+ fs.copyFromLocalFile(new Path(jarFile1.getPath()),
+ new Path(fs.getUri().toString() + Path.SEPARATOR));
+ String jarFileOnHDFS1 = fs.getUri().toString() + Path.SEPARATOR +
+ jarFile1.getName();
+ Path pathOnHDFS1 = new Path(jarFileOnHDFS1);
+ assertTrue("Copy jar file to HDFS failed.",
+ fs.exists(pathOnHDFS1));
+ LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS1);
+
+ fs.copyFromLocalFile(new Path(jarFile2.getPath()),
+ new Path(fs.getUri().toString() + Path.SEPARATOR));
+ String jarFileOnHDFS2 = fs.getUri().toString() + Path.SEPARATOR +
+ jarFile2.getName();
+ Path pathOnHDFS2 = new Path(jarFileOnHDFS2);
+ assertTrue("Copy jar file to HDFS failed.",
+ fs.exists(pathOnHDFS2));
+ LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS2);
+
+ // create a table that references the coprocessors
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor("test"));
+ // without configuration values
+ htd.setValue("COPROCESSOR$1", jarFileOnHDFS1.toString() + "|" + cpName1 +
+ "|" + Coprocessor.PRIORITY_USER);
+ // with configuration values
+ htd.setValue("COPROCESSOR$2", jarFileOnHDFS2.toString() + "|" + cpName2 +
+ "|" + Coprocessor.PRIORITY_USER + "|k1=v1,k2=v2,k3=v3");
+ Admin admin = TEST_UTIL.getHBaseAdmin();
+ if (admin.tableExists(tableName)) {
+ if (admin.isTableEnabled(tableName)) {
+ admin.disableTable(tableName);
+ }
+ admin.deleteTable(tableName);
+ }
+ CoprocessorClassLoader.clearCache();
+ byte[] startKey = {10, 63};
+ byte[] endKey = {12, 43};
+ admin.createTable(htd, startKey, endKey, 4);
+ waitForTable(htd.getTableName());
+
+ // verify that the coprocessors were loaded
+ boolean foundTableRegion=false;
+ boolean found1 = true, found2 = true, found2_k1 = true, found2_k2 = true, found2_k3 = true;
+ Map<Region, Set<ClassLoader>> regionsActiveClassLoaders =
+ new HashMap<Region, Set<ClassLoader>>();
+ MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
+ for (Region region:
+ hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
+ if (region.getRegionInfo().getRegionNameAsString().startsWith(tableName.getNameAsString())) {
+ foundTableRegion = true;
+ CoprocessorEnvironment env;
+ env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName1);
+ found1 = found1 && (env != null);
+ env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName2);
+ found2 = found2 && (env != null);
+ if (env != null) {
+ Configuration conf = env.getConfiguration();
+ found2_k1 = found2_k1 && (conf.get("k1") != null);
+ found2_k2 = found2_k2 && (conf.get("k2") != null);
+ found2_k3 = found2_k3 && (conf.get("k3") != null);
+ } else {
+ found2_k1 = found2_k2 = found2_k3 = false;
+ }
+ regionsActiveClassLoaders
+ .put(region, ((CoprocessorHost) region.getCoprocessorHost()).getExternalClassLoaders());
+ }
+ }
+
+ assertTrue("No region was found for table " + tableName, foundTableRegion);
+ assertTrue("Class " + cpName1 + " was missing on a region", found1);
+ assertTrue("Class " + cpName2 + " was missing on a region", found2);
+ assertTrue("Configuration key 'k1' was missing on a region", found2_k1);
+ assertTrue("Configuration key 'k2' was missing on a region", found2_k2);
+ assertTrue("Configuration key 'k3' was missing on a region", found2_k3);
+ // check if CP classloaders are cached
+ assertNotNull(jarFileOnHDFS1 + " was not cached",
+ CoprocessorClassLoader.getIfCached(pathOnHDFS1));
+ assertNotNull(jarFileOnHDFS2 + " was not cached",
+ CoprocessorClassLoader.getIfCached(pathOnHDFS2));
+ //two external jar used, should be one classloader per jar
+ assertEquals("The number of cached classloaders should be equal to the number" +
+ " of external jar files",
+ 2, CoprocessorClassLoader.getAllCached().size());
+ //check if region active classloaders are shared across all RS regions
+ Set<ClassLoader> externalClassLoaders = new HashSet<ClassLoader>(
+ CoprocessorClassLoader.getAllCached());
+ for (Map.Entry<Region, Set<ClassLoader>> regionCP : regionsActiveClassLoaders.entrySet()) {
+ assertTrue("Some CP classloaders for region " + regionCP.getKey() + " are not cached."
+ + " ClassLoader Cache:" + externalClassLoaders
+ + " Region ClassLoaders:" + regionCP.getValue(),
+ externalClassLoaders.containsAll(regionCP.getValue()));
+ }
+ }
+
+ private String getLocalPath(File file) {
+ return new Path(file.toURI()).toString();
+ }
+
+ @Test
+ // HBASE-3516: Test CP Class loading from local file system
+ public void testClassLoadingFromLocalFS() throws Exception {
+ File jarFile = buildCoprocessorJar(cpName3);
+
+ // create a table that references the jar
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(cpName3));
+ htd.addFamily(new HColumnDescriptor("test"));
+ htd.setValue("COPROCESSOR$1", getLocalPath(jarFile) + "|" + cpName3 + "|" +
+ Coprocessor.PRIORITY_USER);
+ Admin admin = TEST_UTIL.getHBaseAdmin();
+ admin.createTable(htd);
+ waitForTable(htd.getTableName());
+
+ // verify that the coprocessor was loaded
+ boolean found = false;
+ MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
+ for (Region region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
+ if (region.getRegionInfo().getRegionNameAsString().startsWith(cpName3)) {
+ found = (region.getCoprocessorHost().findCoprocessor(cpName3) != null);
+ }
+ }
+ assertTrue("Class " + cpName3 + " was missing on a region", found);
+ }
+
+ @Test
+ // HBASE-6308: Test CP classloader is the CoprocessorClassLoader
+ public void testPrivateClassLoader() throws Exception {
+ File jarFile = buildCoprocessorJar(cpName4);
+
+ // create a table that references the jar
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(cpName4));
+ htd.addFamily(new HColumnDescriptor("test"));
+ htd.setValue("COPROCESSOR$1", getLocalPath(jarFile) + "|" + cpName4 + "|" +
+ Coprocessor.PRIORITY_USER);
+ Admin admin = TEST_UTIL.getHBaseAdmin();
+ admin.createTable(htd);
+ waitForTable(htd.getTableName());
+
+ // verify that the coprocessor was loaded correctly
+ boolean found = false;
+ MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
+ for (Region region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
+ if (region.getRegionInfo().getRegionNameAsString().startsWith(cpName4)) {
+ Coprocessor cp = region.getCoprocessorHost().findCoprocessor(cpName4);
+ if (cp != null) {
+ found = true;
+ assertEquals("Class " + cpName4 + " was not loaded by CoprocessorClassLoader",
+ cp.getClass().getClassLoader().getClass(), CoprocessorClassLoader.class);
+ }
+ }
+ }
+ assertTrue("Class " + cpName4 + " was missing on a region", found);
+ }
+
+ @Test
+ // HBase-3810: Registering a Coprocessor at HTableDescriptor should be
+ // less strict
+ public void testHBase3810() throws Exception {
+ // allowed value pattern: [path] | class name | [priority] | [key values]
+
+ File jarFile1 = buildCoprocessorJar(cpName1);
+ File jarFile2 = buildCoprocessorJar(cpName2);
+ File jarFile5 = buildCoprocessorJar(cpName5);
+ File jarFile6 = buildCoprocessorJar(cpName6);
+
+ String cpKey1 = "COPROCESSOR$1";
+ String cpKey2 = " Coprocessor$2 ";
+ String cpKey3 = " coprocessor$03 ";
+
+ String cpValue1 = getLocalPath(jarFile1) + "|" + cpName1 + "|" +
+ Coprocessor.PRIORITY_USER;
+ String cpValue2 = getLocalPath(jarFile2) + " | " + cpName2 + " | ";
+ // load from default class loader
+ String cpValue3 =
+ " | org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver | | k=v ";
+
+ // create a table that references the jar
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor("test"));
+
+ // add 3 coprocessors by setting htd attributes directly.
+ htd.setValue(cpKey1, cpValue1);
+ htd.setValue(cpKey2, cpValue2);
+ htd.setValue(cpKey3, cpValue3);
+
+ // add 2 coprocessor by using new htd.addCoprocessor() api
+ htd.addCoprocessor(cpName5, new Path(getLocalPath(jarFile5)),
+ Coprocessor.PRIORITY_USER, null);
+ Map<String, String> kvs = new HashMap<String, String>();
+ kvs.put("k1", "v1");
+ kvs.put("k2", "v2");
+ kvs.put("k3", "v3");
+ htd.addCoprocessor(cpName6, new Path(getLocalPath(jarFile6)),
+ Coprocessor.PRIORITY_USER, kvs);
+
+ Admin admin = TEST_UTIL.getHBaseAdmin();
+ if (admin.tableExists(tableName)) {
+ if (admin.isTableEnabled(tableName)) {
+ admin.disableTable(tableName);
+ }
+ admin.deleteTable(tableName);
+ }
+ admin.createTable(htd);
+ waitForTable(htd.getTableName());
+
+ // verify that the coprocessor was loaded
+ boolean found_2 = false, found_1 = false, found_3 = false,
+ found_5 = false, found_6 = false;
+ boolean found6_k1 = false, found6_k2 = false, found6_k3 = false,
+ found6_k4 = false;
+
+ MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
+ for (Region region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
+ if (region.getRegionInfo().getRegionNameAsString().startsWith(tableName.getNameAsString())) {
+ found_1 = found_1 ||
+ (region.getCoprocessorHost().findCoprocessor(cpName1) != null);
+ found_2 = found_2 ||
+ (region.getCoprocessorHost().findCoprocessor(cpName2) != null);
+ found_3 = found_3 ||
+ (region.getCoprocessorHost().findCoprocessor("SimpleRegionObserver")
+ != null);
+ found_5 = found_5 ||
+ (region.getCoprocessorHost().findCoprocessor(cpName5) != null);
+
+ CoprocessorEnvironment env =
+ region.getCoprocessorHost().findCoprocessorEnvironment(cpName6);
+ if (env != null) {
+ found_6 = true;
+ Configuration conf = env.getConfiguration();
+ found6_k1 = conf.get("k1") != null;
+ found6_k2 = conf.get("k2") != null;
+ found6_k3 = conf.get("k3") != null;
+ }
+ }
+ }
+
+ assertTrue("Class " + cpName1 + " was missing on a region", found_1);
+ assertTrue("Class " + cpName2 + " was missing on a region", found_2);
+ assertTrue("Class SimpleRegionObserver was missing on a region", found_3);
+ assertTrue("Class " + cpName5 + " was missing on a region", found_5);
+ assertTrue("Class " + cpName6 + " was missing on a region", found_6);
+
+ assertTrue("Configuration key 'k1' was missing on a region", found6_k1);
+ assertTrue("Configuration key 'k2' was missing on a region", found6_k2);
+ assertTrue("Configuration key 'k3' was missing on a region", found6_k3);
+ assertFalse("Configuration key 'k4' wasn't configured", found6_k4);
+ }
+
+ @Test
+ public void testClassLoadingFromLibDirInJar() throws Exception {
+ loadingClassFromLibDirInJar("/lib/");
+ }
+
+ @Test
+ public void testClassLoadingFromRelativeLibDirInJar() throws Exception {
+ loadingClassFromLibDirInJar("lib/");
+ }
+
+ void loadingClassFromLibDirInJar(String libPrefix) throws Exception {
+ FileSystem fs = cluster.getFileSystem();
+
+ File innerJarFile1 = buildCoprocessorJar(cpName1);
+ File innerJarFile2 = buildCoprocessorJar(cpName2);
+ File outerJarFile = new File(TEST_UTIL.getDataTestDir().toString(), "outer.jar");
+
+ ClassLoaderTestHelper.addJarFilesToJar(
+ outerJarFile, libPrefix, innerJarFile1, innerJarFile2);
+
+ // copy the jars into dfs
+ fs.copyFromLocalFile(new Path(outerJarFile.getPath()),
+ new Path(fs.getUri().toString() + Path.SEPARATOR));
+ String jarFileOnHDFS = fs.getUri().toString() + Path.SEPARATOR +
+ outerJarFile.getName();
+ assertTrue("Copy jar file to HDFS failed.",
+ fs.exists(new Path(jarFileOnHDFS)));
+ LOG.info("Copied jar file to HDFS: " + jarFileOnHDFS);
+
+ // create a table that references the coprocessors
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ htd.addFamily(new HColumnDescriptor("test"));
+ // without configuration values
+ htd.setValue("COPROCESSOR$1", jarFileOnHDFS.toString() + "|" + cpName1 +
+ "|" + Coprocessor.PRIORITY_USER);
+ // with configuration values
+ htd.setValue("COPROCESSOR$2", jarFileOnHDFS.toString() + "|" + cpName2 +
+ "|" + Coprocessor.PRIORITY_USER + "|k1=v1,k2=v2,k3=v3");
+ Admin admin = TEST_UTIL.getHBaseAdmin();
+ if (admin.tableExists(tableName)) {
+ if (admin.isTableEnabled(tableName)) {
+ admin.disableTable(tableName);
+ }
+ admin.deleteTable(tableName);
+ }
+ admin.createTable(htd);
+ waitForTable(htd.getTableName());
+
+ // verify that the coprocessors were loaded
+ boolean found1 = false, found2 = false, found2_k1 = false,
+ found2_k2 = false, found2_k3 = false;
+ MiniHBaseCluster hbase = TEST_UTIL.getHBaseCluster();
+ for (Region region: hbase.getRegionServer(0).getOnlineRegionsLocalContext()) {
+ if (region.getRegionInfo().getRegionNameAsString().startsWith(tableName.getNameAsString())) {
+ CoprocessorEnvironment env;
+ env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName1);
+ if (env != null) {
+ found1 = true;
+ }
+ env = region.getCoprocessorHost().findCoprocessorEnvironment(cpName2);
+ if (env != null) {
+ found2 = true;
+ Configuration conf = env.getConfiguration();
+ found2_k1 = conf.get("k1") != null;
+ found2_k2 = conf.get("k2") != null;
+ found2_k3 = conf.get("k3") != null;
+ }
+ }
+ }
+ assertTrue("Class " + cpName1 + " was missing on a region", found1);
+ assertTrue("Class " + cpName2 + " was missing on a region", found2);
+ assertTrue("Configuration key 'k1' was missing on a region", found2_k1);
+ assertTrue("Configuration key 'k2' was missing on a region", found2_k2);
+ assertTrue("Configuration key 'k3' was missing on a region", found2_k3);
+ }
+
+ @Test
+ public void testRegionServerCoprocessorsReported() throws Exception {
+ // This was a test for HBASE-4070.
+ // We are removing coprocessors from region load in HBASE-5258.
+ // Therefore, this test now only checks system coprocessors.
+ assertAllRegionServers(null);
+ }
+
+ /**
+ * return the subset of all regionservers
+ * (actually returns set of ServerLoads)
+ * which host some region in a given table.
+ * used by assertAllRegionServers() below to
+ * test reporting of loaded coprocessors.
+ * @param tableName : given table.
+ * @return subset of all servers.
+ */
+ Map<ServerName, ServerLoad> serversForTable(String tableName) {
+ Map<ServerName, ServerLoad> serverLoadHashMap =
+ new HashMap<ServerName, ServerLoad>();
+ for(Map.Entry<ServerName,ServerLoad> server:
+ TEST_UTIL.getMiniHBaseCluster().getMaster().getServerManager().
+ getOnlineServers().entrySet()) {
+ for( Map.Entry<byte[], RegionLoad> region:
+ server.getValue().getRegionsLoad().entrySet()) {
+ if (region.getValue().getNameAsString().equals(tableName)) {
+ // this server hosts a region of tableName: add this server..
+ serverLoadHashMap.put(server.getKey(),server.getValue());
+ // .. and skip the rest of the regions that it hosts.
+ break;
+ }
+ }
+ }
+ return serverLoadHashMap;
+ }
+
+ void assertAllRegionServers(String tableName) throws InterruptedException {
+ Map<ServerName, ServerLoad> servers;
+ String[] actualCoprocessors = null;
+ boolean success = false;
+ String[] expectedCoprocessors = regionServerSystemCoprocessors;
+ if (tableName == null) {
+ // if no tableName specified, use all servers.
+ servers = TEST_UTIL.getMiniHBaseCluster().getMaster().getServerManager().getOnlineServers();
+ } else {
+ servers = serversForTable(tableName);
+ }
+ for (int i = 0; i < 5; i++) {
+ boolean any_failed = false;
+ for(Map.Entry<ServerName,ServerLoad> server: servers.entrySet()) {
+ actualCoprocessors = server.getValue().getRsCoprocessors();
+ if (!Arrays.equals(actualCoprocessors, expectedCoprocessors)) {
+ LOG.debug("failed comparison: actual: " +
+ Arrays.toString(actualCoprocessors) +
+ " ; expected: " + Arrays.toString(expectedCoprocessors));
+ any_failed = true;
+ expectedCoprocessors = switchExpectedCoprocessors(expectedCoprocessors);
+ break;
+ }
+ expectedCoprocessors = switchExpectedCoprocessors(expectedCoprocessors);
+ }
+ if (any_failed == false) {
+ success = true;
+ break;
+ }
+ LOG.debug("retrying after failed comparison: " + i);
+ Thread.sleep(1000);
+ }
+ assertTrue(success);
+ }
+
+ private String[] switchExpectedCoprocessors(String[] expectedCoprocessors) {
+ if (Arrays.equals(regionServerSystemCoprocessors, expectedCoprocessors)) {
+ expectedCoprocessors = masterRegionServerSystemCoprocessors;
+ } else {
+ expectedCoprocessors = regionServerSystemCoprocessors;
+ }
+ return expectedCoprocessors;
+ }
+
+ @Test
+ public void testMasterCoprocessorsReported() {
+ // HBASE 4070: Improve region server metrics to report loaded coprocessors
+ // to master: verify that the master is reporting the correct set of
+ // loaded coprocessors.
+ final String loadedMasterCoprocessorsVerify =
+ "[" + masterCoprocessor.getSimpleName() + "]";
+ String loadedMasterCoprocessors =
+ java.util.Arrays.toString(
+ TEST_UTIL.getHBaseCluster().getMaster().getMasterCoprocessors());
+ assertEquals(loadedMasterCoprocessorsVerify, loadedMasterCoprocessors);
+ }
+
+ @Test
+ public void testFindCoprocessors() {
+ // HBASE 12277:
+ CoprocessorHost masterCpHost =
+ TEST_UTIL.getHBaseCluster().getMaster().getMasterCoprocessorHost();
+
+ List<MasterObserver> masterObservers = masterCpHost.findCoprocessors(MasterObserver.class);
+
+ assertTrue(masterObservers != null && masterObservers.size() > 0);
+ assertEquals(masterCoprocessor.getSimpleName(),
+ masterObservers.get(0).getClass().getSimpleName());
+ }
+
+ private void waitForTable(TableName name) throws InterruptedException, IOException {
+ // First wait until all regions are online
+ TEST_UTIL.waitTableEnabled(name);
+ // Now wait a bit longer for the coprocessor hosts to load the CPs
+ Thread.sleep(1000);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
new file mode 100644
index 0000000..7e2577a
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
@@ -0,0 +1,349 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
+import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+/**
+ * TestEndpoint: test cases to verify coprocessor Endpoint
+ */
+@Category({CoprocessorTests.class, MediumTests.class})
+public class TestCoprocessorEndpoint {
+ private static final Log LOG = LogFactory.getLog(TestCoprocessorEndpoint.class);
+
+ private static final TableName TEST_TABLE =
+ TableName.valueOf("TestCoprocessorEndpoint");
+ private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
+ private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
+ private static byte[] ROW = Bytes.toBytes("testRow");
+
+ private static final int ROWSIZE = 20;
+ private static final int rowSeperator1 = 5;
+ private static final int rowSeperator2 = 12;
+ private static byte[][] ROWS = makeN(ROW, ROWSIZE);
+
+ private static HBaseTestingUtility util = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ // set configure to indicate which cp should be loaded
+ Configuration conf = util.getConfiguration();
+ conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 5000);
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName(),
+ ProtobufCoprocessorService.class.getName());
+ conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
+ ProtobufCoprocessorService.class.getName());
+ util.startMiniCluster(2);
+ Admin admin = util.getHBaseAdmin();
+ HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
+ desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
+ admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
+ util.waitUntilAllRegionsAssigned(TEST_TABLE);
+
+ Table table = util.getConnection().getTable(TEST_TABLE);
+ for (int i = 0; i < ROWSIZE; i++) {
+ Put put = new Put(ROWS[i]);
+ put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
+ table.put(put);
+ }
+ table.close();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ private Map<byte [], Long> sum(final Table table, final byte [] family,
+ final byte [] qualifier, final byte [] start, final byte [] end)
+ throws ServiceException, Throwable {
+ return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
+ start, end,
+ new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
+ @Override
+ public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
+ throws IOException {
+ CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse>();
+ ColumnAggregationProtos.SumRequest.Builder builder =
+ ColumnAggregationProtos.SumRequest.newBuilder();
+ builder.setFamily(ByteStringer.wrap(family));
+ if (qualifier != null && qualifier.length > 0) {
+ builder.setQualifier(ByteStringer.wrap(qualifier));
+ }
+ instance.sum(null, builder.build(), rpcCallback);
+ return rpcCallback.get().getSum();
+ }
+ });
+ }
+
+ @Test
+ public void testAggregation() throws Throwable {
+ Table table = util.getConnection().getTable(TEST_TABLE);
+ Map<byte[], Long> results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
+ ROWS[0], ROWS[ROWS.length-1]);
+ int sumResult = 0;
+ int expectedResult = 0;
+ for (Map.Entry<byte[], Long> e : results.entrySet()) {
+ LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
+ sumResult += e.getValue();
+ }
+ for (int i = 0; i < ROWSIZE; i++) {
+ expectedResult += i;
+ }
+ assertEquals("Invalid result", expectedResult, sumResult);
+
+ results.clear();
+
+ // scan: for region 2 and region 3
+ results = sum(table, TEST_FAMILY, TEST_QUALIFIER,
+ ROWS[rowSeperator1], ROWS[ROWS.length-1]);
+ sumResult = 0;
+ expectedResult = 0;
+ for (Map.Entry<byte[], Long> e : results.entrySet()) {
+ LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
+ sumResult += e.getValue();
+ }
+ for (int i = rowSeperator1; i < ROWSIZE; i++) {
+ expectedResult += i;
+ }
+ assertEquals("Invalid result", expectedResult, sumResult);
+ table.close();
+ }
+
+ @Test
+ public void testCoprocessorService() throws Throwable {
+ Table table = util.getConnection().getTable(TEST_TABLE);
+
+ List<HRegionLocation> regions;
+ try(RegionLocator rl = util.getConnection().getRegionLocator(TEST_TABLE)) {
+ regions = rl.getAllRegionLocations();
+ }
+ final TestProtos.EchoRequestProto request =
+ TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
+ final Map<byte[], String> results = Collections.synchronizedMap(
+ new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR));
+ try {
+ // scan: for all regions
+ final RpcController controller = new ServerRpcController();
+ table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
+ ROWS[0], ROWS[ROWS.length - 1],
+ new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
+ public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
+ throws IOException {
+ LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
+ CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto>();
+ instance.echo(controller, request, callback);
+ TestProtos.EchoResponseProto response = callback.get();
+ LOG.debug("Batch.Call returning result " + response);
+ return response;
+ }
+ },
+ new Batch.Callback<TestProtos.EchoResponseProto>() {
+ public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
+ assertNotNull(result);
+ assertEquals("hello", result.getMessage());
+ results.put(region, result.getMessage());
+ }
+ }
+ );
+ for (Map.Entry<byte[], String> e : results.entrySet()) {
+ LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
+ }
+ assertEquals(3, results.size());
+ for (HRegionLocation info : regions) {
+ LOG.info("Region info is "+info.getRegionInfo().getRegionNameAsString());
+ assertTrue(results.containsKey(info.getRegionInfo().getRegionName()));
+ }
+ results.clear();
+
+ // scan: for region 2 and region 3
+ table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
+ ROWS[rowSeperator1], ROWS[ROWS.length - 1],
+ new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, TestProtos.EchoResponseProto>() {
+ public TestProtos.EchoResponseProto call(TestRpcServiceProtos.TestProtobufRpcProto instance)
+ throws IOException {
+ LOG.debug("Default response is " + TestProtos.EchoRequestProto.getDefaultInstance());
+ CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto>();
+ instance.echo(controller, request, callback);
+ TestProtos.EchoResponseProto response = callback.get();
+ LOG.debug("Batch.Call returning result " + response);
+ return response;
+ }
+ },
+ new Batch.Callback<TestProtos.EchoResponseProto>() {
+ public void update(byte[] region, byte[] row, TestProtos.EchoResponseProto result) {
+ assertNotNull(result);
+ assertEquals("hello", result.getMessage());
+ results.put(region, result.getMessage());
+ }
+ }
+ );
+ for (Map.Entry<byte[], String> e : results.entrySet()) {
+ LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
+ }
+ assertEquals(2, results.size());
+ } finally {
+ table.close();
+ }
+ }
+
+ @Test
+ public void testCoprocessorServiceNullResponse() throws Throwable {
+ Table table = util.getConnection().getTable(TEST_TABLE);
+ List<HRegionLocation> regions;
+ try(RegionLocator rl = util.getConnection().getRegionLocator(TEST_TABLE)) {
+ regions = rl.getAllRegionLocations();
+ }
+
+ final TestProtos.EchoRequestProto request =
+ TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
+ try {
+ // scan: for all regions
+ final RpcController controller = new ServerRpcController();
+ // test that null results are supported
+ Map<byte[], String> results =
+ table.coprocessorService(TestRpcServiceProtos.TestProtobufRpcProto.class,
+ ROWS[0], ROWS[ROWS.length - 1],
+ new Batch.Call<TestRpcServiceProtos.TestProtobufRpcProto, String>() {
+ public String call(TestRpcServiceProtos.TestProtobufRpcProto instance)
+ throws IOException {
+ CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto> callback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<TestProtos.EchoResponseProto>();
+ instance.echo(controller, request, callback);
+ TestProtos.EchoResponseProto response = callback.get();
+ LOG.debug("Batch.Call got result " + response);
+ return null;
+ }
+ }
+ );
+ for (Map.Entry<byte[], String> e : results.entrySet()) {
+ LOG.info("Got value "+e.getValue()+" for region "+Bytes.toStringBinary(e.getKey()));
+ }
+ assertEquals(3, results.size());
+ for (HRegionLocation region : regions) {
+ HRegionInfo info = region.getRegionInfo();
+ LOG.info("Region info is "+info.getRegionNameAsString());
+ assertTrue(results.containsKey(info.getRegionName()));
+ assertNull(results.get(info.getRegionName()));
+ }
+ } finally {
+ table.close();
+ }
+ }
+
+ @Test
+ public void testMasterCoprocessorService() throws Throwable {
+ Admin admin = util.getHBaseAdmin();
+ final TestProtos.EchoRequestProto request =
+ TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
+ TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
+ TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
+ assertEquals("hello", service.echo(null, request).getMessage());
+ }
+
+ @Test
+ public void testCoprocessorError() throws Exception {
+ Configuration configuration = new Configuration(util.getConfiguration());
+ // Make it not retry forever
+ configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+ Table table = util.getConnection().getTable(TEST_TABLE);
+
+ try {
+ CoprocessorRpcChannel protocol = table.coprocessorService(ROWS[0]);
+
+ TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
+ TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(protocol);
+
+ service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
+ fail("Should have thrown an exception");
+ } catch (ServiceException e) {
+ } finally {
+ table.close();
+ }
+ }
+
+ @Test
+ public void testMasterCoprocessorError() throws Throwable {
+ Admin admin = util.getHBaseAdmin();
+ TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface service =
+ TestRpcServiceProtos.TestProtobufRpcProto.newBlockingStub(admin.coprocessorService());
+ try {
+ service.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
+ fail("Should have thrown an exception");
+ } catch (ServiceException e) {
+ }
+ }
+
+ private static byte[][] makeN(byte[] base, int n) {
+ byte[][] ret = new byte[n][];
+ for (int i = 0; i < n; i++) {
+ ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
+ }
+ return ret;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java
new file mode 100644
index 0000000..4913acf
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
+
+@Category({CoprocessorTests.class, MediumTests.class})
+public class TestCoprocessorTableEndpoint {
+
+ private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
+ private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
+ private static final byte[] ROW = Bytes.toBytes("testRow");
+ private static final int ROWSIZE = 20;
+ private static final int rowSeperator1 = 5;
+ private static final int rowSeperator2 = 12;
+ private static final byte[][] ROWS = makeN(ROW, ROWSIZE);
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster(2);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testCoprocessorTableEndpoint() throws Throwable {
+ final TableName tableName = TableName.valueOf("testCoprocessorTableEndpoint");
+
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
+ desc.addCoprocessor(org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName());
+
+ createTable(desc);
+ verifyTable(tableName);
+ }
+
+ @Test
+ public void testDynamicCoprocessorTableEndpoint() throws Throwable {
+ final TableName tableName = TableName.valueOf("testDynamicCoprocessorTableEndpoint");
+
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
+
+ createTable(desc);
+
+ desc.addCoprocessor(org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName());
+ updateTable(desc);
+
+ verifyTable(tableName);
+ }
+
+ private static byte[][] makeN(byte[] base, int n) {
+ byte[][] ret = new byte[n][];
+ for (int i = 0; i < n; i++) {
+ ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
+ }
+ return ret;
+ }
+
+ private static Map<byte [], Long> sum(final Table table, final byte [] family,
+ final byte [] qualifier, final byte [] start, final byte [] end)
+ throws ServiceException, Throwable {
+ return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
+ start, end,
+ new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
+ @Override
+ public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
+ throws IOException {
+ CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<ColumnAggregationProtos.SumResponse>();
+ ColumnAggregationProtos.SumRequest.Builder builder =
+ ColumnAggregationProtos.SumRequest.newBuilder();
+ builder.setFamily(ByteString.copyFrom(family));
+ if (qualifier != null && qualifier.length > 0) {
+ builder.setQualifier(ByteString.copyFrom(qualifier));
+ }
+ instance.sum(null, builder.build(), rpcCallback);
+ return rpcCallback.get().getSum();
+ }
+ });
+ }
+
+ private static final void createTable(HTableDescriptor desc) throws Exception {
+ Admin admin = TEST_UTIL.getHBaseAdmin();
+ admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
+ TEST_UTIL.waitUntilAllRegionsAssigned(desc.getTableName());
+ Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
+ try {
+ for (int i = 0; i < ROWSIZE; i++) {
+ Put put = new Put(ROWS[i]);
+ put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
+ table.put(put);
+ }
+ } finally {
+ table.close();
+ }
+ }
+
+ private static void updateTable(HTableDescriptor desc) throws Exception {
+ Admin admin = TEST_UTIL.getHBaseAdmin();
+ admin.disableTable(desc.getTableName());
+ admin.modifyTable(desc.getTableName(), desc);
+ admin.enableTable(desc.getTableName());
+ }
+
+ private static final void verifyTable(TableName tableName) throws Throwable {
+ Table table = TEST_UTIL.getConnection().getTable(tableName);
+ try {
+ Map<byte[], Long> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0],
+ ROWS[ROWS.length-1]);
+ int sumResult = 0;
+ int expectedResult = 0;
+ for (Map.Entry<byte[], Long> e : results.entrySet()) {
+ sumResult += e.getValue();
+ }
+ for (int i = 0; i < ROWSIZE; i++) {
+ expectedResult += i;
+ }
+ assertEquals("Invalid result", expectedResult, sumResult);
+
+ // scan: for region 2 and region 3
+ results.clear();
+ results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1], ROWS[ROWS.length-1]);
+ sumResult = 0;
+ expectedResult = 0;
+ for (Map.Entry<byte[], Long> e : results.entrySet()) {
+ sumResult += e.getValue();
+ }
+ for (int i = rowSeperator1; i < ROWSIZE; i++) {
+ expectedResult += i;
+ }
+ assertEquals("Invalid result", expectedResult, sumResult);
+ } finally {
+ table.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java
new file mode 100644
index 0000000..31646f8
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorEndpoint.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.RemoteWithExtrasException;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+
+@Category({CoprocessorTests.class, MediumTests.class})
+public class TestRegionServerCoprocessorEndpoint {
+ public static final FileNotFoundException WHAT_TO_THROW = new FileNotFoundException("/file.txt");
+ private static HBaseTestingUtility TEST_UTIL = null;
+ private static Configuration CONF = null;
+ private static final String DUMMY_VALUE = "val";
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ TEST_UTIL = new HBaseTestingUtility();
+ CONF = TEST_UTIL.getConfiguration();
+ CONF.setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
+ DummyRegionServerEndpoint.class.getName());
+ TEST_UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testEndpoint() throws Exception {
+ final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
+ final ServerRpcController controller = new ServerRpcController();
+ final CoprocessorRpcUtils.BlockingRpcCallback<DummyRegionServerEndpointProtos.DummyResponse>
+ rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<DummyRegionServerEndpointProtos.DummyResponse>();
+ DummyRegionServerEndpointProtos.DummyService service =
+ ProtobufUtil.newServiceStub(DummyRegionServerEndpointProtos.DummyService.class,
+ TEST_UTIL.getHBaseAdmin().coprocessorService(serverName));
+ service.dummyCall(controller,
+ DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(), rpcCallback);
+ assertEquals(DUMMY_VALUE, rpcCallback.get().getValue());
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+ }
+
+ @Test
+ public void testEndpointExceptions() throws Exception {
+ final ServerName serverName = TEST_UTIL.getHBaseCluster().getRegionServer(0).getServerName();
+ final ServerRpcController controller = new ServerRpcController();
+ final CoprocessorRpcUtils.BlockingRpcCallback<DummyRegionServerEndpointProtos.DummyResponse>
+ rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<DummyRegionServerEndpointProtos.DummyResponse>();
+ DummyRegionServerEndpointProtos.DummyService service =
+ ProtobufUtil.newServiceStub(DummyRegionServerEndpointProtos.DummyService.class,
+ TEST_UTIL.getHBaseAdmin().coprocessorService(serverName));
+ service.dummyThrow(controller,
+ DummyRegionServerEndpointProtos.DummyRequest.getDefaultInstance(), rpcCallback);
+ assertEquals(null, rpcCallback.get());
+ assertTrue(controller.failedOnException());
+ assertEquals(WHAT_TO_THROW.getClass().getName().trim(),
+ ((RemoteWithExtrasException) controller.getFailedOn().getCause()).getClassName().trim());
+ }
+
+ static class DummyRegionServerEndpoint extends DummyService implements Coprocessor, SingletonCoprocessorService {
+
+ @Override
+ public Service getService() {
+ return this;
+ }
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void dummyCall(RpcController controller, DummyRequest request,
+ RpcCallback<DummyResponse> callback) {
+ callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build());
+ }
+
+ @Override
+ public void dummyThrow(RpcController controller,
+ DummyRequest request,
+ RpcCallback<DummyResponse> done) {
+ CoprocessorRpcUtils.setControllerException(controller, WHAT_TO_THROW);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
new file mode 100644
index 0000000..7cae0bc
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java
@@ -0,0 +1,665 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.IsolationLevel;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorResponse;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService;
+import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.protobuf.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Verifies ProcessEndpoint works.
+ * The tested RowProcessor performs two scans and a read-modify-write.
+ */
+@Category({CoprocessorTests.class, MediumTests.class})
+public class TestRowProcessorEndpoint {
+
+ private static final Log LOG = LogFactory.getLog(TestRowProcessorEndpoint.class);
+
+ private static final TableName TABLE = TableName.valueOf("testtable");
+ private final static byte[] ROW = Bytes.toBytes("testrow");
+ private final static byte[] ROW2 = Bytes.toBytes("testrow2");
+ private final static byte[] FAM = Bytes.toBytes("friendlist");
+
+ // Column names
+ private final static byte[] A = Bytes.toBytes("a");
+ private final static byte[] B = Bytes.toBytes("b");
+ private final static byte[] C = Bytes.toBytes("c");
+ private final static byte[] D = Bytes.toBytes("d");
+ private final static byte[] E = Bytes.toBytes("e");
+ private final static byte[] F = Bytes.toBytes("f");
+ private final static byte[] G = Bytes.toBytes("g");
+ private final static byte[] COUNTER = Bytes.toBytes("counter");
+ private final static AtomicLong myTimer = new AtomicLong(0);
+ private final AtomicInteger failures = new AtomicInteger(0);
+
+ private static HBaseTestingUtility util = new HBaseTestingUtility();
+ private static volatile int expectedCounter = 0;
+ private static int rowSize, row2Size;
+
+ private volatile static Table table = null;
+ private volatile static boolean swapped = false;
+ private volatile CountDownLatch startSignal;
+ private volatile CountDownLatch doneSignal;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ Configuration conf = util.getConfiguration();
+ conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ RowProcessorEndpoint.class.getName());
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+ conf.setLong("hbase.hregion.row.processor.timeout", 1000L);
+ conf.setLong(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 2048);
+ util.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ public void prepareTestData() throws Exception {
+ try {
+ util.getHBaseAdmin().disableTable(TABLE);
+ util.getHBaseAdmin().deleteTable(TABLE);
+ } catch (Exception e) {
+ // ignore table not found
+ }
+ table = util.createTable(TABLE, FAM);
+ {
+ Put put = new Put(ROW);
+ put.addColumn(FAM, A, Bytes.add(B, C)); // B, C are friends of A
+ put.addColumn(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B
+ put.addColumn(FAM, C, G); // G is a friend of C
+ table.put(put);
+ rowSize = put.size();
+ }
+ Put put = new Put(ROW2);
+ put.addColumn(FAM, D, E);
+ put.addColumn(FAM, F, G);
+ table.put(put);
+ row2Size = put.size();
+ }
+
+ @Test
+ public void testDoubleScan() throws Throwable {
+ prepareTestData();
+
+ CoprocessorRpcChannel channel = table.coprocessorService(ROW);
+ RowProcessorEndpoint.FriendsOfFriendsProcessor processor =
+ new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A);
+ RowProcessorService.BlockingInterface service =
+ RowProcessorService.newBlockingStub(channel);
+ ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
+ ProcessResponse protoResult = service.process(null, request);
+ FriendsOfFriendsProcessorResponse response =
+ FriendsOfFriendsProcessorResponse.parseFrom(protoResult.getRowProcessorResult());
+ Set<String> result = new HashSet<String>();
+ result.addAll(response.getResultList());
+ Set<String> expected =
+ new HashSet<String>(Arrays.asList(new String[]{"d", "e", "f", "g"}));
+ Get get = new Get(ROW);
+ LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells()));
+ assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReadModifyWrite() throws Throwable {
+ prepareTestData();
+ failures.set(0);
+ int numThreads = 100;
+ concurrentExec(new IncrementRunner(), numThreads);
+ Get get = new Get(ROW);
+ LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells()));
+ int finalCounter = incrementCounter(table);
+ assertEquals(numThreads + 1, finalCounter);
+ assertEquals(0, failures.get());
+ }
+
+ class IncrementRunner implements Runnable {
+ @Override
+ public void run() {
+ try {
+ incrementCounter(table);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private int incrementCounter(Table table) throws Throwable {
+ CoprocessorRpcChannel channel = table.coprocessorService(ROW);
+ RowProcessorEndpoint.IncrementCounterProcessor processor =
+ new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
+ RowProcessorService.BlockingInterface service =
+ RowProcessorService.newBlockingStub(channel);
+ ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
+ ProcessResponse protoResult = service.process(null, request);
+ IncCounterProcessorResponse response = IncCounterProcessorResponse
+ .parseFrom(protoResult.getRowProcessorResult());
+ Integer result = response.getResponse();
+ return result;
+ }
+
+ private void concurrentExec(
+ final Runnable task, final int numThreads) throws Throwable {
+ startSignal = new CountDownLatch(numThreads);
+ doneSignal = new CountDownLatch(numThreads);
+ for (int i = 0; i < numThreads; ++i) {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ startSignal.countDown();
+ startSignal.await();
+ task.run();
+ } catch (Throwable e) {
+ failures.incrementAndGet();
+ e.printStackTrace();
+ }
+ doneSignal.countDown();
+ }
+ }).start();
+ }
+ doneSignal.await();
+ }
+
+ @Test
+ public void testMultipleRows() throws Throwable {
+ prepareTestData();
+ failures.set(0);
+ int numThreads = 100;
+ concurrentExec(new SwapRowsRunner(), numThreads);
+ LOG.debug("row keyvalues:" +
+ stringifyKvs(table.get(new Get(ROW)).listCells()));
+ LOG.debug("row2 keyvalues:" +
+ stringifyKvs(table.get(new Get(ROW2)).listCells()));
+ assertEquals(rowSize, table.get(new Get(ROW)).listCells().size());
+ assertEquals(row2Size, table.get(new Get(ROW2)).listCells().size());
+ assertEquals(0, failures.get());
+ }
+
+ class SwapRowsRunner implements Runnable {
+ @Override
+ public void run() {
+ try {
+ swapRows(table);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void swapRows(Table table) throws Throwable {
+ CoprocessorRpcChannel channel = table.coprocessorService(ROW);
+ RowProcessorEndpoint.RowSwapProcessor processor =
+ new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2);
+ RowProcessorService.BlockingInterface service =
+ RowProcessorService.newBlockingStub(channel);
+ ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
+ service.process(null, request);
+ }
+
+ @Test
+ public void testTimeout() throws Throwable {
+ prepareTestData();
+ CoprocessorRpcChannel channel = table.coprocessorService(ROW);
+ RowProcessorEndpoint.TimeoutProcessor processor =
+ new RowProcessorEndpoint.TimeoutProcessor(ROW);
+ RowProcessorService.BlockingInterface service =
+ RowProcessorService.newBlockingStub(channel);
+ ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
+ boolean exceptionCaught = false;
+ try {
+ service.process(null, request);
+ } catch (Exception e) {
+ exceptionCaught = true;
+ }
+ assertTrue(exceptionCaught);
+ }
+
+ /**
+ * This class defines two RowProcessors:
+ * IncrementCounterProcessor and FriendsOfFriendsProcessor.
+ *
+ * We define the RowProcessors as the inner class of the endpoint.
+ * So they can be loaded with the endpoint on the coprocessor.
+ */
+ public static class RowProcessorEndpoint<S extends Message,T extends Message>
+ extends BaseRowProcessorEndpoint<S,T> implements CoprocessorService {
+ public static class IncrementCounterProcessor extends
+ BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest,
+ IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> {
+ int counter = 0;
+ byte[] row = new byte[0];
+
+ /**
+ * Empty constructor for Writable
+ */
+ IncrementCounterProcessor() {
+ }
+
+ IncrementCounterProcessor(byte[] row) {
+ this.row = row;
+ }
+
+ @Override
+ public Collection<byte[]> getRowsToLock() {
+ return Collections.singleton(row);
+ }
+
+ @Override
+ public IncCounterProcessorResponse getResult() {
+ IncCounterProcessorResponse.Builder i = IncCounterProcessorResponse.newBuilder();
+ i.setResponse(counter);
+ return i.build();
+ }
+
+ @Override
+ public boolean readOnly() {
+ return false;
+ }
+
+ @Override
+ public void process(long now, HRegion region,
+ List<Mutation> mutations, WALEdit walEdit) throws IOException {
+ // Scan current counter
+ List<Cell> kvs = new ArrayList<Cell>();
+ Scan scan = new Scan(row, row);
+ scan.addColumn(FAM, COUNTER);
+ doScan(region, scan, kvs);
+ counter = kvs.size() == 0 ? 0 :
+ Bytes.toInt(CellUtil.cloneValue(kvs.iterator().next()));
+
+ // Assert counter value
+ assertEquals(expectedCounter, counter);
+
+ // Increment counter and send it to both memstore and wal edit
+ counter += 1;
+ expectedCounter += 1;
+
+
+ Put p = new Put(row);
+ KeyValue kv =
+ new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter));
+ p.add(kv);
+ mutations.add(p);
+ walEdit.add(kv);
+
+ // We can also inject some meta data to the walEdit
+ KeyValue metaKv = new KeyValue(
+ row, WALEdit.METAFAMILY,
+ Bytes.toBytes("I just increment counter"),
+ Bytes.toBytes(counter));
+ walEdit.add(metaKv);
+ }
+
+ @Override
+ public IncCounterProcessorRequest getRequestData() throws IOException {
+ IncCounterProcessorRequest.Builder builder = IncCounterProcessorRequest.newBuilder();
+ builder.setCounter(counter);
+ builder.setRow(ByteStringer.wrap(row));
+ return builder.build();
+ }
+
+ @Override
+ public void initialize(IncCounterProcessorRequest msg) {
+ this.row = msg.getRow().toByteArray();
+ this.counter = msg.getCounter();
+ }
+ }
+
+ public static class FriendsOfFriendsProcessor extends
+ BaseRowProcessor<FriendsOfFriendsProcessorRequest, FriendsOfFriendsProcessorResponse> {
+ byte[] row = null;
+ byte[] person = null;
+ final Set<String> result = new HashSet<String>();
+
+ /**
+ * Empty constructor for Writable
+ */
+ FriendsOfFriendsProcessor() {
+ }
+
+ FriendsOfFriendsProcessor(byte[] row, byte[] person) {
+ this.row = row;
+ this.person = person;
+ }
+
+ @Override
+ public Collection<byte[]> getRowsToLock() {
+ return Collections.singleton(row);
+ }
+
+ @Override
+ public FriendsOfFriendsProcessorResponse getResult() {
+ FriendsOfFriendsProcessorResponse.Builder builder =
+ FriendsOfFriendsProcessorResponse.newBuilder();
+ builder.addAllResult(result);
+ return builder.build();
+ }
+
+ @Override
+ public boolean readOnly() {
+ return true;
+ }
+
+ @Override
+ public void process(long now, HRegion region,
+ List<Mutation> mutations, WALEdit walEdit) throws IOException {
+ List<Cell> kvs = new ArrayList<Cell>();
+ { // First scan to get friends of the person
+ Scan scan = new Scan(row, row);
+ scan.addColumn(FAM, person);
+ doScan(region, scan, kvs);
+ }
+
+ // Second scan to get friends of friends
+ Scan scan = new Scan(row, row);
+ for (Cell kv : kvs) {
+ byte[] friends = CellUtil.cloneValue(kv);
+ for (byte f : friends) {
+ scan.addColumn(FAM, new byte[]{f});
+ }
+ }
+ doScan(region, scan, kvs);
+
+ // Collect result
+ result.clear();
+ for (Cell kv : kvs) {
+ for (byte b : CellUtil.cloneValue(kv)) {
+ result.add((char)b + "");
+ }
+ }
+ }
+
+ @Override
+ public FriendsOfFriendsProcessorRequest getRequestData() throws IOException {
+ FriendsOfFriendsProcessorRequest.Builder builder =
+ FriendsOfFriendsProcessorRequest.newBuilder();
+ builder.setPerson(ByteStringer.wrap(person));
+ builder.setRow(ByteStringer.wrap(row));
+ builder.addAllResult(result);
+ FriendsOfFriendsProcessorRequest f = builder.build();
+ return f;
+ }
+
+ @Override
+ public void initialize(FriendsOfFriendsProcessorRequest request)
+ throws IOException {
+ this.person = request.getPerson().toByteArray();
+ this.row = request.getRow().toByteArray();
+ result.clear();
+ result.addAll(request.getResultList());
+ }
+ }
+
+ public static class RowSwapProcessor extends
+ BaseRowProcessor<RowSwapProcessorRequest, RowSwapProcessorResponse> {
+ byte[] row1 = new byte[0];
+ byte[] row2 = new byte[0];
+
+ /**
+ * Empty constructor for Writable
+ */
+ RowSwapProcessor() {
+ }
+
+ RowSwapProcessor(byte[] row1, byte[] row2) {
+ this.row1 = row1;
+ this.row2 = row2;
+ }
+
+ @Override
+ public Collection<byte[]> getRowsToLock() {
+ List<byte[]> rows = new ArrayList<byte[]>();
+ rows.add(row1);
+ rows.add(row2);
+ return rows;
+ }
+
+ @Override
+ public boolean readOnly() {
+ return false;
+ }
+
+ @Override
+ public RowSwapProcessorResponse getResult() {
+ return RowSwapProcessorResponse.getDefaultInstance();
+ }
+
+ @Override
+ public void process(long now, HRegion region,
+ List<Mutation> mutations, WALEdit walEdit) throws IOException {
+
+ // Override the time to avoid race-condition in the unit test caused by
+ // inacurate timer on some machines
+ now = myTimer.getAndIncrement();
+
+ // Scan both rows
+ List<Cell> kvs1 = new ArrayList<Cell>();
+ List<Cell> kvs2 = new ArrayList<Cell>();
+ doScan(region, new Scan(row1, row1), kvs1);
+ doScan(region, new Scan(row2, row2), kvs2);
+
+ // Assert swapped
+ if (swapped) {
+ assertEquals(rowSize, kvs2.size());
+ assertEquals(row2Size, kvs1.size());
+ } else {
+ assertEquals(rowSize, kvs1.size());
+ assertEquals(row2Size, kvs2.size());
+ }
+ swapped = !swapped;
+
+ // Add and delete keyvalues
+ List<List<Cell>> kvs = new ArrayList<List<Cell>>();
+ kvs.add(kvs1);
+ kvs.add(kvs2);
+ byte[][] rows = new byte[][]{row1, row2};
+ for (int i = 0; i < kvs.size(); ++i) {
+ for (Cell kv : kvs.get(i)) {
+ // Delete from the current row and add to the other row
+ Delete d = new Delete(rows[i]);
+ KeyValue kvDelete =
+ new KeyValue(rows[i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
+ kv.getTimestamp(), KeyValue.Type.Delete);
+ d.addDeleteMarker(kvDelete);
+ Put p = new Put(rows[1 - i]);
+ KeyValue kvAdd =
+ new KeyValue(rows[1 - i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
+ now, CellUtil.cloneValue(kv));
+ p.add(kvAdd);
+ mutations.add(d);
+ walEdit.add(kvDelete);
+ mutations.add(p);
+ walEdit.add(kvAdd);
+ }
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "swap";
+ }
+
+ @Override
+ public RowSwapProcessorRequest getRequestData() throws IOException {
+ RowSwapProcessorRequest.Builder builder = RowSwapProcessorRequest.newBuilder();
+ builder.setRow1(ByteStringer.wrap(row1));
+ builder.setRow2(ByteStringer.wrap(row2));
+ return builder.build();
+ }
+
+ @Override
+ public void initialize(RowSwapProcessorRequest msg) {
+ this.row1 = msg.getRow1().toByteArray();
+ this.row2 = msg.getRow2().toByteArray();
+ }
+ }
+
+ public static class TimeoutProcessor extends
+ BaseRowProcessor<TimeoutProcessorRequest, TimeoutProcessorResponse> {
+
+ byte[] row = new byte[0];
+
+ /**
+ * Empty constructor for Writable
+ */
+ public TimeoutProcessor() {
+ }
+
+ public TimeoutProcessor(byte[] row) {
+ this.row = row;
+ }
+
+ public Collection<byte[]> getRowsToLock() {
+ return Collections.singleton(row);
+ }
+
+ @Override
+ public TimeoutProcessorResponse getResult() {
+ return TimeoutProcessorResponse.getDefaultInstance();
+ }
+
+ @Override
+ public void process(long now, HRegion region,
+ List<Mutation> mutations, WALEdit walEdit) throws IOException {
+ try {
+ // Sleep for a long time so it timeout
+ Thread.sleep(100 * 1000L);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean readOnly() {
+ return true;
+ }
+
+ @Override
+ public String getName() {
+ return "timeout";
+ }
+
+ @Override
+ public TimeoutProcessorRequest getRequestData() throws IOException {
+ TimeoutProcessorRequest.Builder builder = TimeoutProcessorRequest.newBuilder();
+ builder.setRow(ByteStringer.wrap(row));
+ return builder.build();
+ }
+
+ @Override
+ public void initialize(TimeoutProcessorRequest msg) throws IOException {
+ this.row = msg.getRow().toByteArray();
+ }
+ }
+
+ public static void doScan(
+ HRegion region, Scan scan, List<Cell> result) throws IOException {
+ InternalScanner scanner = null;
+ try {
+ scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
+ scanner = region.getScanner(scan);
+ result.clear();
+ scanner.next(result);
+ } finally {
+ if (scanner != null) scanner.close();
+ }
+ }
+ }
+
+ static String stringifyKvs(Collection<Cell> kvs) {
+ StringBuilder out = new StringBuilder();
+ out.append("[");
+ if (kvs != null) {
+ for (Cell kv : kvs) {
+ byte[] col = CellUtil.cloneQualifier(kv);
+ byte[] val = CellUtil.cloneValue(kv);
+ if (Bytes.equals(col, COUNTER)) {
+ out.append(Bytes.toStringBinary(col) + ":" +
+ Bytes.toInt(val) + " ");
+ } else {
+ out.append(Bytes.toStringBinary(col) + ":" +
+ Bytes.toStringBinary(val) + " ");
+ }
+ }
+ }
+ out.append("]");
+ return out.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/ipc/TestCoprocessorRpcUtils.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/ipc/TestCoprocessorRpcUtils.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/ipc/TestCoprocessorRpcUtils.java
new file mode 100644
index 0000000..15a2747
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/ipc/TestCoprocessorRpcUtils.java
@@ -0,0 +1,45 @@
+/*
+
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.protobuf.Descriptors;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestCoprocessorRpcUtils {
+ @Test
+ public void testServiceName() throws Exception {
+ // verify that we de-namespace build in HBase rpc services
+ Descriptors.ServiceDescriptor authService =
+ AuthenticationProtos.AuthenticationService.getDescriptor();
+ assertEquals(authService.getName(), CoprocessorRpcUtils.getServiceName(authService));
+
+ // non-hbase rpc services should remain fully qualified
+ Descriptors.ServiceDescriptor dummyService =
+ DummyRegionServerEndpointProtos.DummyService.getDescriptor();
+ assertEquals(dummyService.getFullName(), CoprocessorRpcUtils.getServiceName(dummyService));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java
new file mode 100644
index 0000000..a82900d
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.security.token.Token;
+
+/**
+ * Client proxy for SecureBulkLoadProtocol used in conjunction with SecureBulkLoadEndpoint
+ * @deprecated Use for backward compatibility testing only. Will be removed when
+ * SecureBulkLoadEndpoint is not supported.
+ */
+@InterfaceAudience.Private
+public class SecureBulkLoadEndpointClient {
+ private Table table;
+
+ public SecureBulkLoadEndpointClient(Table table) {
+ this.table = table;
+ }
+
+ public String prepareBulkLoad(final TableName tableName) throws IOException {
+ try {
+ CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
+ SecureBulkLoadProtos.SecureBulkLoadService instance =
+ ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
+
+ ServerRpcController controller = new ServerRpcController();
+
+ CoprocessorRpcUtils.BlockingRpcCallback<PrepareBulkLoadResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<PrepareBulkLoadResponse>();
+
+ PrepareBulkLoadRequest request =
+ PrepareBulkLoadRequest.newBuilder()
+ .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
+
+ instance.prepareBulkLoad(controller, request, rpcCallback);
+
+ PrepareBulkLoadResponse response = rpcCallback.get();
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+
+ return response.getBulkToken();
+ } catch (Throwable throwable) {
+ throw new IOException(throwable);
+ }
+ }
+
+ public void cleanupBulkLoad(final String bulkToken) throws IOException {
+ try {
+ CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW);
+ SecureBulkLoadProtos.SecureBulkLoadService instance =
+ ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
+
+ ServerRpcController controller = new ServerRpcController();
+
+ CoprocessorRpcUtils.BlockingRpcCallback<CleanupBulkLoadResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<CleanupBulkLoadResponse>();
+
+ CleanupBulkLoadRequest request =
+ CleanupBulkLoadRequest.newBuilder()
+ .setBulkToken(bulkToken).build();
+
+ instance.cleanupBulkLoad(controller,
+ request,
+ rpcCallback);
+
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+ } catch (Throwable throwable) {
+ throw new IOException(throwable);
+ }
+ }
+
+ public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
+ final Token<?> userToken,
+ final String bulkToken,
+ final byte[] startRow) throws IOException {
+ // we never want to send a batch of HFiles to all regions, thus cannot call
+ // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
+ try {
+ CoprocessorRpcChannel channel = table.coprocessorService(startRow);
+ SecureBulkLoadProtos.SecureBulkLoadService instance =
+ ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel);
+
+ DelegationToken protoDT =
+ DelegationToken.newBuilder().build();
+ if(userToken != null) {
+ protoDT =
+ DelegationToken.newBuilder()
+ .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
+ .setPassword(ByteStringer.wrap(userToken.getPassword()))
+ .setKind(userToken.getKind().toString())
+ .setService(userToken.getService().toString()).build();
+ }
+
+ List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
+ new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>();
+ for(Pair<byte[], String> el: familyPaths) {
+ protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
+ .setFamily(ByteStringer.wrap(el.getFirst()))
+ .setPath(el.getSecond()).build());
+ }
+
+ SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
+ SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
+ .setFsToken(protoDT)
+ .addAllFamilyPath(protoFamilyPaths)
+ .setBulkToken(bulkToken).build();
+
+ ServerRpcController controller = new ServerRpcController();
+ CoprocessorRpcUtils.BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>
+ rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>();
+ instance.secureBulkLoadHFiles(controller,
+ request,
+ rpcCallback);
+
+ SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get();
+ if (controller.failedOnException()) {
+ throw controller.getFailedOn();
+ }
+ return response.getLoaded();
+ } catch (Throwable throwable) {
+ throw new IOException(throwable);
+ }
+ }
+
+}