You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/10/04 05:16:42 UTC
[29/51] [partial] hbase git commit: HBASE-15638 Shade protobuf Which
includes
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
new file mode 100644
index 0000000..7ef9b9c
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ClientServiceCallable;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.RpcRetryingCaller;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests bulk loading of HFiles with old secure Endpoint client for backward compatibility. Will be
+ * removed when old non-secure client for backward compatibility is not supported.
+ */
+@RunWith(Parameterized.class)
+@Category({RegionServerTests.class, LargeTests.class})
+public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionServerBulkLoad {
+ public TestHRegionServerBulkLoadWithOldSecureEndpoint(int duration) {
+ super(duration);
+ }
+
+ private static final Log LOG =
+ LogFactory.getLog(TestHRegionServerBulkLoadWithOldSecureEndpoint.class);
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws IOException {
+ conf.setInt("hbase.rpc.timeout", 10 * 1000);
+ conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ "org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+ }
+
+ public static class AtomicHFileLoader extends RepeatingTestThread {
+ final AtomicLong numBulkLoads = new AtomicLong();
+ final AtomicLong numCompactions = new AtomicLong();
+ private TableName tableName;
+
+ public AtomicHFileLoader(TableName tableName, TestContext ctx,
+ byte targetFamilies[][]) throws IOException {
+ super(ctx);
+ this.tableName = tableName;
+ }
+
+ public void doAnAction() throws Exception {
+ long iteration = numBulkLoads.getAndIncrement();
+ Path dir = UTIL.getDataTestDirOnTestFS(String.format("bulkLoad_%08d",
+ iteration));
+
+ // create HFiles for different column families
+ FileSystem fs = UTIL.getTestFileSystem();
+ byte[] val = Bytes.toBytes(String.format("%010d", iteration));
+ final List<Pair<byte[], String>> famPaths = new ArrayList<Pair<byte[], String>>(
+ NUM_CFS);
+ for (int i = 0; i < NUM_CFS; i++) {
+ Path hfile = new Path(dir, family(i));
+ byte[] fam = Bytes.toBytes(family(i));
+ createHFile(fs, hfile, fam, QUAL, val, 1000);
+ famPaths.add(new Pair<>(fam, hfile.toString()));
+ }
+
+ // bulk load HFiles
+ final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection();
+ Table table = conn.getTable(tableName);
+ final String bulkToken = new SecureBulkLoadEndpointClient(table).prepareBulkLoad(tableName);
+ RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
+ ClientServiceCallable<Void> callable =
+ new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
+ rpcControllerFactory.newController()) {
+ @Override
+ protected Void rpcCall() throws Exception {
+ LOG.debug("Going to connect to server " + getLocation() + " for row " +
+ Bytes.toStringBinary(getRow()));
+ try (Table table = conn.getTable(getTableName())) {
+ boolean loaded = new SecureBulkLoadEndpointClient(table).bulkLoadHFiles(famPaths,
+ null, bulkToken, getLocation().getRegionInfo().getStartKey());
+ }
+ return null;
+ }
+ };
+ RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
+ RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
+ caller.callWithRetries(callable, Integer.MAX_VALUE);
+
+ // Periodically do compaction to reduce the number of open file handles.
+ if (numBulkLoads.get() % 5 == 0) {
+ // 5 * 50 = 250 open file handles!
+ callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
+ rpcControllerFactory.newController()) {
+ @Override
+ protected Void rpcCall() throws Exception {
+ LOG.debug("compacting " + getLocation() + " for row "
+ + Bytes.toStringBinary(getRow()));
+ AdminProtos.AdminService.BlockingInterface server =
+ conn.getAdmin(getLocation().getServerName());
+ CompactRegionRequest request =
+ RequestConverter.buildCompactRegionRequest(
+ getLocation().getRegionInfo().getRegionName(), true, null);
+ server.compactRegion(null, request);
+ numCompactions.incrementAndGet();
+ return null;
+ }
+ };
+ caller.callWithRetries(callable, Integer.MAX_VALUE);
+ }
+ }
+ }
+
+ void runAtomicBulkloadTest(TableName tableName, int millisToRun, int numScanners)
+ throws Exception {
+ setupTable(tableName, 10);
+
+ TestContext ctx = new TestContext(UTIL.getConfiguration());
+
+ AtomicHFileLoader loader = new AtomicHFileLoader(tableName, ctx, null);
+ ctx.addThread(loader);
+
+ List<AtomicScanReader> scanners = Lists.newArrayList();
+ for (int i = 0; i < numScanners; i++) {
+ AtomicScanReader scanner = new AtomicScanReader(tableName, ctx, families);
+ scanners.add(scanner);
+ ctx.addThread(scanner);
+ }
+
+ ctx.startThreads();
+ ctx.waitFor(millisToRun);
+ ctx.stop();
+
+ LOG.info("Loaders:");
+ LOG.info(" loaded " + loader.numBulkLoads.get());
+ LOG.info(" compations " + loader.numCompactions.get());
+
+ LOG.info("Scanners:");
+ for (AtomicScanReader scanner : scanners) {
+ LOG.info(" scanned " + scanner.numScans.get());
+ LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java
new file mode 100644
index 0000000..9bff701
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestServerCustomProtocol.java
@@ -0,0 +1,480 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.CountRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.CountResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.HelloRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.HelloResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.IncrementCountRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.IncrementCountResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.NoopRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.NoopResponse;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.PingRequest;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.PingProtos.PingResponse;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
+@Category({RegionServerTests.class, MediumTests.class})
+public class TestServerCustomProtocol {
+ private static final Log LOG = LogFactory.getLog(TestServerCustomProtocol.class);
+ static final String WHOAREYOU = "Who are you?";
+ static final String NOBODY = "nobody";
+ static final String HELLO = "Hello, ";
+
+ /* Test protocol implementation */
+ public static class PingHandler extends PingProtos.PingService
+ implements Coprocessor, CoprocessorService {
+ private int counter = 0;
+
+ @Override
+ public void start(CoprocessorEnvironment env) throws IOException {
+ if (env instanceof RegionCoprocessorEnvironment) return;
+ throw new CoprocessorException("Must be loaded on a table region!");
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) throws IOException {
+ // Nothing to do.
+ }
+
+ @Override
+ public void ping(RpcController controller, PingRequest request,
+ RpcCallback<PingResponse> done) {
+ this.counter++;
+ done.run(PingResponse.newBuilder().setPong("pong").build());
+ }
+
+ @Override
+ public void count(RpcController controller, CountRequest request,
+ RpcCallback<CountResponse> done) {
+ done.run(CountResponse.newBuilder().setCount(this.counter).build());
+ }
+
+ @Override
+ public void increment(RpcController controller,
+ IncrementCountRequest request, RpcCallback<IncrementCountResponse> done) {
+ this.counter += request.getDiff();
+ done.run(IncrementCountResponse.newBuilder().setCount(this.counter).build());
+ }
+
+ @Override
+ public void hello(RpcController controller, HelloRequest request,
+ RpcCallback<HelloResponse> done) {
+ if (!request.hasName()) done.run(HelloResponse.newBuilder().setResponse(WHOAREYOU).build());
+ else if (request.getName().equals(NOBODY)) done.run(HelloResponse.newBuilder().build());
+ else done.run(HelloResponse.newBuilder().setResponse(HELLO + request.getName()).build());
+ }
+
+ @Override
+ public void noop(RpcController controller, NoopRequest request,
+ RpcCallback<NoopResponse> done) {
+ done.run(NoopResponse.newBuilder().build());
+ }
+
+ @Override
+ public Service getService() {
+ return this;
+ }
+ }
+
+ private static final TableName TEST_TABLE = TableName.valueOf("test");
+ private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
+
+ private static final byte[] ROW_A = Bytes.toBytes("aaa");
+ private static final byte[] ROW_B = Bytes.toBytes("bbb");
+ private static final byte[] ROW_C = Bytes.toBytes("ccc");
+
+ private static final byte[] ROW_AB = Bytes.toBytes("abb");
+ private static final byte[] ROW_BC = Bytes.toBytes("bcc");
+
+ private static HBaseTestingUtility util = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ PingHandler.class.getName());
+ util.startMiniCluster();
+ }
+
+ @Before
+ public void before() throws Exception {
+ final byte[][] SPLIT_KEYS = new byte[][] { ROW_B, ROW_C };
+ Table table = util.createTable(TEST_TABLE, TEST_FAMILY, SPLIT_KEYS);
+
+ Put puta = new Put( ROW_A );
+ puta.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
+ table.put(puta);
+
+ Put putb = new Put( ROW_B );
+ putb.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
+ table.put(putb);
+
+ Put putc = new Put( ROW_C );
+ putc.addColumn(TEST_FAMILY, Bytes.toBytes("col1"), Bytes.toBytes(1));
+ table.put(putc);
+ }
+
+ @After
+ public void after() throws Exception {
+ util.deleteTable(TEST_TABLE);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testSingleProxy() throws Throwable {
+ Table table = util.getConnection().getTable(TEST_TABLE);
+ Map<byte [], String> results = ping(table, null, null);
+ // There are three regions so should get back three results.
+ assertEquals(3, results.size());
+ for (Map.Entry<byte [], String> e: results.entrySet()) {
+ assertEquals("Invalid custom protocol response", "pong", e.getValue());
+ }
+ hello(table, "George", HELLO + "George");
+ LOG.info("Did george");
+ hello(table, null, "Who are you?");
+ LOG.info("Who are you");
+ hello(table, NOBODY, null);
+ LOG.info(NOBODY);
+ Map<byte [], Integer> intResults = table.coprocessorService(PingProtos.PingService.class,
+ null, null,
+ new Batch.Call<PingProtos.PingService, Integer>() {
+ @Override
+ public Integer call(PingProtos.PingService instance) throws IOException {
+ CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.CountResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.CountResponse>();
+ instance.count(null, PingProtos.CountRequest.newBuilder().build(), rpcCallback);
+ return rpcCallback.get().getCount();
+ }
+ });
+ int count = -1;
+ for (Map.Entry<byte [], Integer> e: intResults.entrySet()) {
+ assertTrue(e.getValue() > 0);
+ count = e.getValue();
+ }
+ final int diff = 5;
+ intResults = table.coprocessorService(PingProtos.PingService.class,
+ null, null,
+ new Batch.Call<PingProtos.PingService, Integer>() {
+ @Override
+ public Integer call(PingProtos.PingService instance) throws IOException {
+ CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.IncrementCountResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.IncrementCountResponse>();
+ instance.increment(null,
+ PingProtos.IncrementCountRequest.newBuilder().setDiff(diff).build(),
+ rpcCallback);
+ return rpcCallback.get().getCount();
+ }
+ });
+ // There are three regions so should get back three results.
+ assertEquals(3, results.size());
+ for (Map.Entry<byte [], Integer> e: intResults.entrySet()) {
+ assertEquals(e.getValue().intValue(), count + diff);
+ }
+ table.close();
+ }
+
+ private Map<byte [], String> hello(final Table table, final String send, final String response)
+ throws ServiceException, Throwable {
+ Map<byte [], String> results = hello(table, send);
+ for (Map.Entry<byte [], String> e: results.entrySet()) {
+ assertEquals("Invalid custom protocol response", response, e.getValue());
+ }
+ return results;
+ }
+
+ private Map<byte [], String> hello(final Table table, final String send)
+ throws ServiceException, Throwable {
+ return hello(table, send, null, null);
+ }
+
+ private Map<byte [], String> hello(final Table table, final String send, final byte [] start,
+ final byte [] end)
+ throws ServiceException, Throwable {
+ return table.coprocessorService(PingProtos.PingService.class,
+ start, end,
+ new Batch.Call<PingProtos.PingService, String>() {
+ @Override
+ public String call(PingProtos.PingService instance) throws IOException {
+ CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.HelloResponse>();
+ PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder();
+ if (send != null) builder.setName(send);
+ instance.hello(null, builder.build(), rpcCallback);
+ PingProtos.HelloResponse r = rpcCallback.get();
+ return r != null && r.hasResponse()? r.getResponse(): null;
+ }
+ });
+ }
+
+ private Map<byte [], String> compoundOfHelloAndPing(final Table table, final byte [] start,
+ final byte [] end)
+ throws ServiceException, Throwable {
+ return table.coprocessorService(PingProtos.PingService.class,
+ start, end,
+ new Batch.Call<PingProtos.PingService, String>() {
+ @Override
+ public String call(PingProtos.PingService instance) throws IOException {
+ CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.HelloResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.HelloResponse>();
+ PingProtos.HelloRequest.Builder builder = PingProtos.HelloRequest.newBuilder();
+ // Call ping on same instance. Use result calling hello on same instance.
+ builder.setName(doPing(instance));
+ instance.hello(null, builder.build(), rpcCallback);
+ PingProtos.HelloResponse r = rpcCallback.get();
+ return r != null && r.hasResponse()? r.getResponse(): null;
+ }
+ });
+ }
+
+ private Map<byte [], String> noop(final Table table, final byte [] start,
+ final byte [] end)
+ throws ServiceException, Throwable {
+ return table.coprocessorService(PingProtos.PingService.class, start, end,
+ new Batch.Call<PingProtos.PingService, String>() {
+ @Override
+ public String call(PingProtos.PingService instance) throws IOException {
+ CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.NoopResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.NoopResponse>();
+ PingProtos.NoopRequest.Builder builder = PingProtos.NoopRequest.newBuilder();
+ instance.noop(null, builder.build(), rpcCallback);
+ rpcCallback.get();
+ // Looks like null is expected when void. That is what the test below is looking for
+ return null;
+ }
+ });
+ }
+
+ @Test
+ public void testSingleMethod() throws Throwable {
+ try (Table table = util.getConnection().getTable(TEST_TABLE);
+ RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) {
+ Map<byte [], String> results = table.coprocessorService(PingProtos.PingService.class,
+ null, ROW_A,
+ new Batch.Call<PingProtos.PingService, String>() {
+ @Override
+ public String call(PingProtos.PingService instance) throws IOException {
+ CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.PingResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.PingResponse>();
+ instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback);
+ return rpcCallback.get().getPong();
+ }
+ });
+ // Should have gotten results for 1 of the three regions only since we specified
+ // rows from 1 region
+ assertEquals(1, results.size());
+ verifyRegionResults(locator, results, ROW_A);
+
+ final String name = "NAME";
+ results = hello(table, name, null, ROW_A);
+ // Should have gotten results for 1 of the three regions only since we specified
+ // rows from 1 region
+ assertEquals(1, results.size());
+ verifyRegionResults(locator, results, "Hello, NAME", ROW_A);
+ }
+ }
+
+ @Test
+ public void testRowRange() throws Throwable {
+ try (Table table = util.getConnection().getTable(TEST_TABLE);
+ RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) {
+ for (HRegionLocation e: locator.getAllRegionLocations()) {
+ LOG.info("Region " + e.getRegionInfo().getRegionNameAsString()
+ + ", servername=" + e.getServerName());
+ }
+ // Here are what regions looked like on a run:
+ //
+ // test,,1355943549657.c65d4822d8bdecc033a96451f3a0f55d.
+ // test,bbb,1355943549661.110393b070dd1ed93441e0bc9b3ffb7e.
+ // test,ccc,1355943549665.c3d6d125141359cbbd2a43eaff3cdf74.
+
+ Map<byte [], String> results = ping(table, null, ROW_A);
+ // Should contain first region only.
+ assertEquals(1, results.size());
+ verifyRegionResults(locator, results, ROW_A);
+
+ // Test start row + empty end
+ results = ping(table, ROW_BC, null);
+ assertEquals(2, results.size());
+ // should contain last 2 regions
+ HRegionLocation loc = locator.getRegionLocation(ROW_A, true);
+ assertNull("Should be missing region for row aaa (prior to start row)",
+ results.get(loc.getRegionInfo().getRegionName()));
+ verifyRegionResults(locator, results, ROW_B);
+ verifyRegionResults(locator, results, ROW_C);
+
+ // test empty start + end
+ results = ping(table, null, ROW_BC);
+ // should contain the first 2 regions
+ assertEquals(2, results.size());
+ verifyRegionResults(locator, results, ROW_A);
+ verifyRegionResults(locator, results, ROW_B);
+ loc = locator.getRegionLocation(ROW_C, true);
+ assertNull("Should be missing region for row ccc (past stop row)",
+ results.get(loc.getRegionInfo().getRegionName()));
+
+ // test explicit start + end
+ results = ping(table, ROW_AB, ROW_BC);
+ // should contain first 2 regions
+ assertEquals(2, results.size());
+ verifyRegionResults(locator, results, ROW_A);
+ verifyRegionResults(locator, results, ROW_B);
+ loc = locator.getRegionLocation(ROW_C, true);
+ assertNull("Should be missing region for row ccc (past stop row)",
+ results.get(loc.getRegionInfo().getRegionName()));
+
+ // test single region
+ results = ping(table, ROW_B, ROW_BC);
+ // should only contain region bbb
+ assertEquals(1, results.size());
+ verifyRegionResults(locator, results, ROW_B);
+ loc = locator.getRegionLocation(ROW_A, true);
+ assertNull("Should be missing region for row aaa (prior to start)",
+ results.get(loc.getRegionInfo().getRegionName()));
+ loc = locator.getRegionLocation(ROW_C, true);
+ assertNull("Should be missing region for row ccc (past stop row)",
+ results.get(loc.getRegionInfo().getRegionName()));
+ }
+ }
+
+ private Map<byte [], String> ping(final Table table, final byte [] start, final byte [] end)
+ throws ServiceException, Throwable {
+ return table.coprocessorService(PingProtos.PingService.class, start, end,
+ new Batch.Call<PingProtos.PingService, String>() {
+ @Override
+ public String call(PingProtos.PingService instance) throws IOException {
+ return doPing(instance);
+ }
+ });
+ }
+
+ private static String doPing(PingProtos.PingService instance) throws IOException {
+ CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.PingResponse> rpcCallback =
+ new CoprocessorRpcUtils.BlockingRpcCallback<PingProtos.PingResponse>();
+ instance.ping(null, PingProtos.PingRequest.newBuilder().build(), rpcCallback);
+ return rpcCallback.get().getPong();
+ }
+
+ @Test
+ public void testCompoundCall() throws Throwable {
+ try (Table table = util.getConnection().getTable(TEST_TABLE);
+ RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) {
+ Map<byte [], String> results = compoundOfHelloAndPing(table, ROW_A, ROW_C);
+ verifyRegionResults(locator, results, "Hello, pong", ROW_A);
+ verifyRegionResults(locator, results, "Hello, pong", ROW_B);
+ verifyRegionResults(locator, results, "Hello, pong", ROW_C);
+ }
+ }
+
+ @Test
+ public void testNullCall() throws Throwable {
+ try (Table table = util.getConnection().getTable(TEST_TABLE);
+ RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) {
+ Map<byte[],String> results = hello(table, null, ROW_A, ROW_C);
+ verifyRegionResults(locator, results, "Who are you?", ROW_A);
+ verifyRegionResults(locator, results, "Who are you?", ROW_B);
+ verifyRegionResults(locator, results, "Who are you?", ROW_C);
+ }
+ }
+
+ @Test
+ public void testNullReturn() throws Throwable {
+ try (Table table = util.getConnection().getTable(TEST_TABLE);
+ RegionLocator locator = util.getConnection().getRegionLocator(TEST_TABLE)) {
+ Map<byte[],String> results = hello(table, "nobody", ROW_A, ROW_C);
+ verifyRegionResults(locator, results, null, ROW_A);
+ verifyRegionResults(locator, results, null, ROW_B);
+ verifyRegionResults(locator, results, null, ROW_C);
+ }
+ }
+
+ @Test
+ public void testEmptyReturnType() throws Throwable {
+ try (Table table = util.getConnection().getTable(TEST_TABLE)) {
+ Map<byte[],String> results = noop(table, ROW_A, ROW_C);
+ assertEquals("Should have results from three regions", 3, results.size());
+ // all results should be null
+ for (Object v : results.values()) {
+ assertNull(v);
+ }
+ }
+ }
+
+ private void verifyRegionResults(RegionLocator table,
+ Map<byte[],String> results, byte[] row) throws Exception {
+ verifyRegionResults(table, results, "pong", row);
+ }
+
+ private void verifyRegionResults(RegionLocator regionLocator,
+ Map<byte[], String> results, String expected, byte[] row)
+ throws Exception {
+ for (Map.Entry<byte [], String> e: results.entrySet()) {
+ LOG.info("row=" + Bytes.toString(row) + ", expected=" + expected +
+ ", result key=" + Bytes.toString(e.getKey()) +
+ ", value=" + e.getValue());
+ }
+ HRegionLocation loc = regionLocator.getRegionLocation(row, true);
+ byte[] region = loc.getRegionInfo().getRegionName();
+ assertTrue("Results should contain region " +
+ Bytes.toStringBinary(region) + " for row '" + Bytes.toStringBinary(row)+ "'",
+ results.containsKey(region));
+ assertEquals("Invalid result for row '"+Bytes.toStringBinary(row)+"'",
+ expected, results.get(region));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
new file mode 100644
index 0000000..f54c632
--- /dev/null
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpTool {
+
+ private static final Log LOG = LogFactory
+ .getLog(TestReplicationSyncUpToolWithBulkLoadedData.class);
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+ conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
+ conf1.set("hbase.replication.source.fs.conf.provider",
+ TestSourceFSConfigurationProvider.class.getCanonicalName());
+ String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+ if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) {
+ classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint";
+ conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes);
+ }
+
+ TestReplicationBase.setUpBeforeClass();
+ }
+
+ @Override
+ public void testSyncUpTool() throws Exception {
+ /**
+ * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
+ * 'cf1' : replicated 'norep': not replicated
+ */
+ setupReplication();
+
+ /**
+ * Prepare 16 random hfile ranges required for creating hfiles
+ */
+ Iterator<String> randomHFileRangeListIterator = null;
+ Set<String> randomHFileRanges = new HashSet<String>(16);
+ for (int i = 0; i < 16; i++) {
+ randomHFileRanges.add(UUID.randomUUID().toString());
+ }
+ List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
+ Collections.sort(randomHFileRangeList);
+ randomHFileRangeListIterator = randomHFileRangeList.iterator();
+
+ /**
+ * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows
+ * into cf1, and 3 rows into norep verify correctly replicated to slave
+ */
+ loadAndReplicateHFiles(true, randomHFileRangeListIterator);
+
+ /**
+ * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load
+ * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and
+ * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
+ * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step
+ * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not
+ * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to
+ * Slave
+ */
+ mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
+
+ }
+
+ private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
+ throws Exception {
+ LOG.debug("mimicSyncUpAfterBulkLoad");
+ utility2.shutdownMiniHBaseCluster();
+
+ loadAndReplicateHFiles(false, randomHFileRangeListIterator);
+
+ int rowCount_ht1Source = utility1.countRows(ht1Source);
+ assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206,
+ rowCount_ht1Source);
+
+ int rowCount_ht2Source = utility1.countRows(ht2Source);
+ assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
+ rowCount_ht2Source);
+
+ utility1.shutdownMiniHBaseCluster();
+ utility2.restartHBaseCluster(1);
+
+ Thread.sleep(SLEEP_TIME);
+
+ // Before sync up
+ int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
+ int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+ assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
+ assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
+
+ // Run sync up tool
+ syncUp(utility1);
+
+ // After syun up
+ for (int i = 0; i < NB_RETRIES; i++) {
+ syncUp(utility1);
+ rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
+ rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+ if (i == NB_RETRIES - 1) {
+ if (rowCount_ht1TargetAtPeer1 != 200 || rowCount_ht2TargetAtPeer1 != 400) {
+ // syncUP still failed. Let's look at the source in case anything wrong there
+ utility1.restartHBaseCluster(1);
+ rowCount_ht1Source = utility1.countRows(ht1Source);
+ LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
+ rowCount_ht2Source = utility1.countRows(ht2Source);
+ LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source);
+ }
+ assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
+ rowCount_ht1TargetAtPeer1);
+ assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
+ rowCount_ht2TargetAtPeer1);
+ }
+ if (rowCount_ht1TargetAtPeer1 == 200 && rowCount_ht2TargetAtPeer1 == 400) {
+ LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i);
+ break;
+ } else {
+ LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
+ + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
+ + rowCount_ht2TargetAtPeer1);
+ }
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+
+ private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,
+ Iterator<String> randomHFileRangeListIterator) throws Exception {
+ LOG.debug("loadAndReplicateHFiles");
+
+ // Load 100 + 3 hfiles to t1_syncup.
+ byte[][][] hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges,
+ 100);
+
+ hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source,
+ hfileRanges, 3);
+
+ // Load 200 + 3 hfiles to t2_syncup.
+ hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges,
+ 200);
+
+ hfileRanges =
+ new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht2Source,
+ hfileRanges, 3);
+
+ if (verifyReplicationOnSlave) {
+ // ensure replication completed
+ wait(ht1TargetAtPeer1, utility1.countRows(ht1Source) - 3,
+ "t1_syncup has 103 rows on source, and 100 on slave1");
+
+ wait(ht2TargetAtPeer1, utility1.countRows(ht2Source) - 3,
+ "t2_syncup has 203 rows on source, and 200 on slave1");
+ }
+ }
+
+ private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
+ Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
+ Path dir = utility1.getDataTestDirOnTestFS(testName);
+ FileSystem fs = utility1.getTestFileSystem();
+ dir = dir.makeQualified(fs);
+ Path familyDir = new Path(dir, Bytes.toString(fam));
+
+ int hfileIdx = 0;
+ for (byte[][] range : hfileRanges) {
+ byte[] from = range[0];
+ byte[] to = range[1];
+ HFileTestUtil.createHFile(utility1.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ + hfileIdx++), fam, row, from, to, numOfRows);
+ }
+
+ final TableName tableName = source.getName();
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration());
+ String[] args = { dir.toString(), tableName.toString() };
+ loader.run(args);
+ }
+
+ private void wait(Table target, int expectedCount, String msg) throws IOException,
+ InterruptedException {
+ for (int i = 0; i < NB_RETRIES; i++) {
+ int rowCount_ht2TargetAtPeer1 = utility2.countRows(target);
+ if (i == NB_RETRIES - 1) {
+ assertEquals(msg, expectedCount, rowCount_ht2TargetAtPeer1);
+ }
+ if (expectedCount == rowCount_ht2TargetAtPeer1) {
+ break;
+ }
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-examples/README.txt
----------------------------------------------------------------------
diff --git a/hbase-examples/README.txt b/hbase-examples/README.txt
index 6578bb4..78051a6 100644
--- a/hbase-examples/README.txt
+++ b/hbase-examples/README.txt
@@ -62,3 +62,7 @@ Example code.
2. Execute {make}.
3. Execute {./DemoClient}.
+Also includes example coprocessor endpoint examples. The protobuf files are at src/main/protobuf.
+See hbase-protocol README.txt for how to generate the example RowCountService Coprocessor
+Endpoint and Aggregator examples.
+
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-examples/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-examples/pom.xml b/hbase-examples/pom.xml
index 22afa4db..2238857 100644
--- a/hbase-examples/pom.xml
+++ b/hbase-examples/pom.xml
@@ -169,6 +169,46 @@
<surefire.skipSecondPart>true</surefire.skipSecondPart>
</properties>
</profile>
+ <profile>
+ <id>compile-protobuf</id>
+ <activation>
+ <property>
+ <name>compile-protobuf</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-maven-plugins</artifactId>
+ <executions>
+ <execution>
+ <id>compile-protoc</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>protoc</goal>
+ </goals>
+ <configuration>
+ <imports>
+ <param>${basedir}/src/main/protobuf</param>
+ </imports>
+ <source>
+ <directory>${basedir}/src/main/protobuf</directory>
+ <!-- Unfortunately, Hadoop plugin does not support *.proto.
+ We have to individually list every proto file here -->
+ <includes>
+ <include>Examples.proto</include>
+ </includes>
+ </source>
+ <!--<output>${project.build.directory}/generated-sources/java</output>-->
+ <output>${basedir}/src/main/java/</output>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
<!-- Profiles for building against different hadoop versions -->
<!-- There are a lot of common dependencies used here, should investigate
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
index c9ab23c..7e6c290 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java
@@ -44,8 +44,8 @@ import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.Bu
import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse.Builder;
import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -171,7 +171,7 @@ public class BulkDeleteEndpoint extends BulkDeleteService implements Coprocessor
} catch (IOException ioe) {
LOG.error(ioe);
// Call ServerRpcController#getFailedOn() to retrieve this IOException at client side.
- ResponseConverter.setControllerException(controller, ioe);
+ CoprocessorRpcUtils.setControllerException(controller, ioe);
} finally {
if (scanner != null) {
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
index 4309cdc..c2387c5 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
-import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
@@ -94,7 +94,7 @@ public class RowCountEndpoint extends ExampleProtos.RowCountService
response = ExampleProtos.CountResponse.newBuilder()
.setCount(count).build();
} catch (IOException ioe) {
- ResponseConverter.setControllerException(controller, ioe);
+ CoprocessorRpcUtils.setControllerException(controller, ioe);
} finally {
if (scanner != null) {
try {
@@ -129,7 +129,7 @@ public class RowCountEndpoint extends ExampleProtos.RowCountService
response = ExampleProtos.CountResponse.newBuilder()
.setCount(count).build();
} catch (IOException ioe) {
- ResponseConverter.setControllerException(controller, ioe);
+ CoprocessorRpcUtils.setControllerException(controller, ioe);
} finally {
if (scanner != null) {
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-examples/src/main/protobuf/Examples.proto
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/protobuf/Examples.proto b/hbase-examples/src/main/protobuf/Examples.proto
new file mode 100644
index 0000000..ed9ed07
--- /dev/null
+++ b/hbase-examples/src/main/protobuf/Examples.proto
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.coprocessor.example.generated";
+option java_outer_classname = "ExampleProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+message CountRequest {
+}
+
+message CountResponse {
+ required int64 count = 1 [default = 0];
+}
+
+service RowCountService {
+ rpc getRowCount(CountRequest)
+ returns (CountResponse);
+ rpc getKeyValueCount(CountRequest)
+ returns (CountResponse);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java
deleted file mode 100644
index 317081b..0000000
--- a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java
+++ /dev/null
@@ -1,443 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.coprocessor.example;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest;
-import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.Builder;
-import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteRequest.DeleteType;
-import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteResponse;
-import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.BulkDeleteService;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FilterList.Operator;
-import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
-import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.experimental.categories.Category;
-
-@Category({CoprocessorTests.class, MediumTests.class})
-public class TestBulkDeleteProtocol {
- private static final byte[] FAMILY1 = Bytes.toBytes("cf1");
- private static final byte[] FAMILY2 = Bytes.toBytes("cf2");
- private static final byte[] QUALIFIER1 = Bytes.toBytes("c1");
- private static final byte[] QUALIFIER2 = Bytes.toBytes("c2");
- private static final byte[] QUALIFIER3 = Bytes.toBytes("c3");
- private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
- // @Ignore @BeforeClass
- public static void setupBeforeClass() throws Exception {
- TEST_UTIL.getConfiguration().set(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
- BulkDeleteEndpoint.class.getName());
- TEST_UTIL.startMiniCluster(2);
- }
-
- // @Ignore @AfterClass
- public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
- }
-
- // @Ignore @Test
- public void testBulkDeleteEndpoint() throws Throwable {
- TableName tableName = TableName.valueOf("testBulkDeleteEndpoint");
- Table ht = createTable(tableName);
- List<Put> puts = new ArrayList<Put>(100);
- for (int j = 0; j < 100; j++) {
- byte[] rowkey = Bytes.toBytes(j);
- puts.add(createPut(rowkey, "v1"));
- }
- ht.put(puts);
- // Deleting all the rows.
- long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 5, DeleteType.ROW, null);
- assertEquals(100, noOfRowsDeleted);
-
- int rows = 0;
- for (Result result : ht.getScanner(new Scan())) {
- rows++;
- }
- assertEquals(0, rows);
- ht.close();
- }
-
- // @Ignore @Test
- public void testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion()
- throws Throwable {
- TableName tableName = TableName
- .valueOf("testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion");
- Table ht = createTable(tableName);
- List<Put> puts = new ArrayList<Put>(100);
- for (int j = 0; j < 100; j++) {
- byte[] rowkey = Bytes.toBytes(j);
- puts.add(createPut(rowkey, "v1"));
- }
- ht.put(puts);
- // Deleting all the rows.
- long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 10, DeleteType.ROW, null);
- assertEquals(100, noOfRowsDeleted);
-
- int rows = 0;
- for (Result result : ht.getScanner(new Scan())) {
- rows++;
- }
- assertEquals(0, rows);
- ht.close();
- }
-
- private long invokeBulkDeleteProtocol(TableName tableName, final Scan scan, final int rowBatchSize,
- final DeleteType deleteType, final Long timeStamp) throws Throwable {
- Table ht = TEST_UTIL.getConnection().getTable(tableName);
- long noOfDeletedRows = 0L;
- Batch.Call<BulkDeleteService, BulkDeleteResponse> callable =
- new Batch.Call<BulkDeleteService, BulkDeleteResponse>() {
- ServerRpcController controller = new ServerRpcController();
- BlockingRpcCallback<BulkDeleteResponse> rpcCallback =
- new BlockingRpcCallback<BulkDeleteResponse>();
-
- public BulkDeleteResponse call(BulkDeleteService service) throws IOException {
- Builder builder = BulkDeleteRequest.newBuilder();
- builder.setScan(ProtobufUtil.toScan(scan));
- builder.setDeleteType(deleteType);
- builder.setRowBatchSize(rowBatchSize);
- if (timeStamp != null) {
- builder.setTimestamp(timeStamp);
- }
- service.delete(controller, builder.build(), rpcCallback);
- return rpcCallback.get();
- }
- };
- Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, scan
- .getStartRow(), scan.getStopRow(), callable);
- for (BulkDeleteResponse response : result.values()) {
- noOfDeletedRows += response.getRowsDeleted();
- }
- ht.close();
- return noOfDeletedRows;
- }
-
- // @Ignore @Test
- public void testBulkDeleteWithConditionBasedDelete() throws Throwable {
- TableName tableName = TableName.valueOf("testBulkDeleteWithConditionBasedDelete");
- Table ht = createTable(tableName);
- List<Put> puts = new ArrayList<Put>(100);
- for (int j = 0; j < 100; j++) {
- byte[] rowkey = Bytes.toBytes(j);
- String value = (j % 10 == 0) ? "v1" : "v2";
- puts.add(createPut(rowkey, value));
- }
- ht.put(puts);
- Scan scan = new Scan();
- FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
- SingleColumnValueFilter scvf = new SingleColumnValueFilter(FAMILY1, QUALIFIER3,
- CompareOp.EQUAL, Bytes.toBytes("v1"));
- // fl.addFilter(new FirstKeyOnlyFilter());
- fl.addFilter(scvf);
- scan.setFilter(fl);
- // Deleting all the rows where cf1:c1=v1
- long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.ROW, null);
- assertEquals(10, noOfRowsDeleted);
-
- int rows = 0;
- for (Result result : ht.getScanner(new Scan())) {
- rows++;
- }
- assertEquals(90, rows);
- ht.close();
- }
-
- // @Ignore @Test
- public void testBulkDeleteColumn() throws Throwable {
- TableName tableName = TableName.valueOf("testBulkDeleteColumn");
- Table ht = createTable(tableName);
- List<Put> puts = new ArrayList<Put>(100);
- for (int j = 0; j < 100; j++) {
- byte[] rowkey = Bytes.toBytes(j);
- String value = (j % 10 == 0) ? "v1" : "v2";
- puts.add(createPut(rowkey, value));
- }
- ht.put(puts);
- Scan scan = new Scan();
- scan.addColumn(FAMILY1, QUALIFIER2);
- // Delete the column cf1:col2
- long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.COLUMN, null);
- assertEquals(100, noOfRowsDeleted);
-
- int rows = 0;
- for (Result result : ht.getScanner(new Scan())) {
- assertEquals(2, result.getFamilyMap(FAMILY1).size());
- assertTrue(result.getColumnCells(FAMILY1, QUALIFIER2).isEmpty());
- assertEquals(1, result.getColumnCells(FAMILY1, QUALIFIER1).size());
- assertEquals(1, result.getColumnCells(FAMILY1, QUALIFIER3).size());
- rows++;
- }
- assertEquals(100, rows);
- ht.close();
- }
-
- // @Ignore @Test
- public void testBulkDeleteFamily() throws Throwable {
- TableName tableName = TableName.valueOf("testBulkDeleteFamily");
- HTableDescriptor htd = new HTableDescriptor(tableName);
- htd.addFamily(new HColumnDescriptor(FAMILY1));
- htd.addFamily(new HColumnDescriptor(FAMILY2));
- TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
- Table ht = TEST_UTIL.getConnection().getTable(tableName);
- List<Put> puts = new ArrayList<Put>(100);
- for (int j = 0; j < 100; j++) {
- Put put = new Put(Bytes.toBytes(j));
- put.addColumn(FAMILY1, QUALIFIER1, "v1".getBytes());
- put.addColumn(FAMILY2, QUALIFIER2, "v2".getBytes());
- puts.add(put);
- }
- ht.put(puts);
- Scan scan = new Scan();
- scan.addFamily(FAMILY1);
- // Delete the column family cf1
- long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.FAMILY, null);
- assertEquals(100, noOfRowsDeleted);
- int rows = 0;
- for (Result result : ht.getScanner(new Scan())) {
- assertTrue(result.getFamilyMap(FAMILY1).isEmpty());
- assertEquals(1, result.getColumnCells(FAMILY2, QUALIFIER2).size());
- rows++;
- }
- assertEquals(100, rows);
- ht.close();
- }
-
- // @Ignore @Test
- public void testBulkDeleteColumnVersion() throws Throwable {
- TableName tableName = TableName.valueOf("testBulkDeleteColumnVersion");
- Table ht = createTable(tableName);
- List<Put> puts = new ArrayList<Put>(100);
- for (int j = 0; j < 100; j++) {
- Put put = new Put(Bytes.toBytes(j));
- byte[] value = "v1".getBytes();
- put.addColumn(FAMILY1, QUALIFIER1, 1234L, value);
- put.addColumn(FAMILY1, QUALIFIER2, 1234L, value);
- put.addColumn(FAMILY1, QUALIFIER3, 1234L, value);
- // Latest version values
- value = "v2".getBytes();
- put.addColumn(FAMILY1, QUALIFIER1, value);
- put.addColumn(FAMILY1, QUALIFIER2, value);
- put.addColumn(FAMILY1, QUALIFIER3, value);
- put.addColumn(FAMILY1, null, value);
- puts.add(put);
- }
- ht.put(puts);
- Scan scan = new Scan();
- scan.addFamily(FAMILY1);
- // Delete the latest version values of all the columns in family cf1.
- long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION,
- HConstants.LATEST_TIMESTAMP);
- assertEquals(100, noOfRowsDeleted);
- int rows = 0;
- scan = new Scan();
- scan.setMaxVersions();
- for (Result result : ht.getScanner(scan)) {
- assertEquals(3, result.getFamilyMap(FAMILY1).size());
- List<Cell> column = result.getColumnCells(FAMILY1, QUALIFIER1);
- assertEquals(1, column.size());
- assertTrue(CellUtil.matchingValue(column.get(0), "v1".getBytes()));
-
- column = result.getColumnCells(FAMILY1, QUALIFIER2);
- assertEquals(1, column.size());
- assertTrue(CellUtil.matchingValue(column.get(0), "v1".getBytes()));
-
- column = result.getColumnCells(FAMILY1, QUALIFIER3);
- assertEquals(1, column.size());
- assertTrue(CellUtil.matchingValue(column.get(0), "v1".getBytes()));
- rows++;
- }
- assertEquals(100, rows);
- ht.close();
- }
-
- // @Ignore @Test
- public void testBulkDeleteColumnVersionBasedOnTS() throws Throwable {
- TableName tableName = TableName.valueOf("testBulkDeleteColumnVersionBasedOnTS");
- Table ht = createTable(tableName);
- List<Put> puts = new ArrayList<Put>(100);
- for (int j = 0; j < 100; j++) {
- Put put = new Put(Bytes.toBytes(j));
- // TS = 1000L
- byte[] value = "v1".getBytes();
- put.addColumn(FAMILY1, QUALIFIER1, 1000L, value);
- put.addColumn(FAMILY1, QUALIFIER2, 1000L, value);
- put.addColumn(FAMILY1, QUALIFIER3, 1000L, value);
- // TS = 1234L
- value = "v2".getBytes();
- put.addColumn(FAMILY1, QUALIFIER1, 1234L, value);
- put.addColumn(FAMILY1, QUALIFIER2, 1234L, value);
- put.addColumn(FAMILY1, QUALIFIER3, 1234L, value);
- // Latest version values
- value = "v3".getBytes();
- put.addColumn(FAMILY1, QUALIFIER1, value);
- put.addColumn(FAMILY1, QUALIFIER2, value);
- put.addColumn(FAMILY1, QUALIFIER3, value);
- puts.add(put);
- }
- ht.put(puts);
- Scan scan = new Scan();
- scan.addColumn(FAMILY1, QUALIFIER3);
- // Delete the column cf1:c3's one version at TS=1234
- long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION, 1234L);
- assertEquals(100, noOfRowsDeleted);
- int rows = 0;
- scan = new Scan();
- scan.setMaxVersions();
- for (Result result : ht.getScanner(scan)) {
- assertEquals(3, result.getFamilyMap(FAMILY1).size());
- assertEquals(3, result.getColumnCells(FAMILY1, QUALIFIER1).size());
- assertEquals(3, result.getColumnCells(FAMILY1, QUALIFIER2).size());
- List<Cell> column = result.getColumnCells(FAMILY1, QUALIFIER3);
- assertEquals(2, column.size());
- assertTrue(CellUtil.matchingValue(column.get(0), "v3".getBytes()));
- assertTrue(CellUtil.matchingValue(column.get(1), "v1".getBytes()));
- rows++;
- }
- assertEquals(100, rows);
- ht.close();
- }
-
- // @Ignore @Test
- public void testBulkDeleteWithNumberOfVersions() throws Throwable {
- TableName tableName = TableName.valueOf("testBulkDeleteWithNumberOfVersions");
- Table ht = createTable(tableName);
- List<Put> puts = new ArrayList<Put>(100);
- for (int j = 0; j < 100; j++) {
- Put put = new Put(Bytes.toBytes(j));
- // TS = 1000L
- byte[] value = "v1".getBytes();
- put.addColumn(FAMILY1, QUALIFIER1, 1000L, value);
- put.addColumn(FAMILY1, QUALIFIER2, 1000L, value);
- put.addColumn(FAMILY1, QUALIFIER3, 1000L, value);
- // TS = 1234L
- value = "v2".getBytes();
- put.addColumn(FAMILY1, QUALIFIER1, 1234L, value);
- put.addColumn(FAMILY1, QUALIFIER2, 1234L, value);
- put.addColumn(FAMILY1, QUALIFIER3, 1234L, value);
- // TS = 2000L
- value = "v3".getBytes();
- put.addColumn(FAMILY1, QUALIFIER1, 2000L, value);
- put.addColumn(FAMILY1, QUALIFIER2, 2000L, value);
- put.addColumn(FAMILY1, QUALIFIER3, 2000L, value);
- // Latest version values
- value = "v4".getBytes();
- put.addColumn(FAMILY1, QUALIFIER1, value);
- put.addColumn(FAMILY1, QUALIFIER2, value);
- put.addColumn(FAMILY1, QUALIFIER3, value);
- puts.add(put);
- }
- ht.put(puts);
-
- // Delete all the versions of columns cf1:c1 and cf1:c2 falling with the time range
- // [1000,2000)
- final Scan scan = new Scan();
- scan.addColumn(FAMILY1, QUALIFIER1);
- scan.addColumn(FAMILY1, QUALIFIER2);
- scan.setTimeRange(1000L, 2000L);
- scan.setMaxVersions();
-
- long noOfDeletedRows = 0L;
- long noOfVersionsDeleted = 0L;
- Batch.Call<BulkDeleteService, BulkDeleteResponse> callable =
- new Batch.Call<BulkDeleteService, BulkDeleteResponse>() {
- ServerRpcController controller = new ServerRpcController();
- BlockingRpcCallback<BulkDeleteResponse> rpcCallback =
- new BlockingRpcCallback<BulkDeleteResponse>();
-
- public BulkDeleteResponse call(BulkDeleteService service) throws IOException {
- Builder builder = BulkDeleteRequest.newBuilder();
- builder.setScan(ProtobufUtil.toScan(scan));
- builder.setDeleteType(DeleteType.VERSION);
- builder.setRowBatchSize(500);
- service.delete(controller, builder.build(), rpcCallback);
- return rpcCallback.get();
- }
- };
- Map<byte[], BulkDeleteResponse> result = ht.coprocessorService(BulkDeleteService.class, scan
- .getStartRow(), scan.getStopRow(), callable);
- for (BulkDeleteResponse response : result.values()) {
- noOfDeletedRows += response.getRowsDeleted();
- noOfVersionsDeleted += response.getVersionsDeleted();
- }
- assertEquals(100, noOfDeletedRows);
- assertEquals(400, noOfVersionsDeleted);
-
- int rows = 0;
- Scan scan1 = new Scan();
- scan1.setMaxVersions();
- for (Result res : ht.getScanner(scan1)) {
- assertEquals(3, res.getFamilyMap(FAMILY1).size());
- List<Cell> column = res.getColumnCells(FAMILY1, QUALIFIER1);
- assertEquals(2, column.size());
- assertTrue(CellUtil.matchingValue(column.get(0), "v4".getBytes()));
- assertTrue(CellUtil.matchingValue(column.get(1), "v3".getBytes()));
- column = res.getColumnCells(FAMILY1, QUALIFIER2);
- assertEquals(2, column.size());
- assertTrue(CellUtil.matchingValue(column.get(0), "v4".getBytes()));
- assertTrue(CellUtil.matchingValue(column.get(1), "v3".getBytes()));
- assertEquals(4, res.getColumnCells(FAMILY1, QUALIFIER3).size());
- rows++;
- }
- assertEquals(100, rows);
- ht.close();
- }
-
- private Table createTable(TableName tableName) throws IOException {
- HTableDescriptor htd = new HTableDescriptor(tableName);
- HColumnDescriptor hcd = new HColumnDescriptor(FAMILY1);
- hcd.setMaxVersions(10);// Just setting 10 as I am not testing with more than 10 versions here
- htd.addFamily(hcd);
- TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
- Table ht = TEST_UTIL.getConnection().getTable(tableName);
- return ht;
- }
-
- private Put createPut(byte[] rowkey, String value) throws IOException {
- Put put = new Put(rowkey);
- put.addColumn(FAMILY1, QUALIFIER1, value.getBytes());
- put.addColumn(FAMILY1, QUALIFIER2, value.getBytes());
- put.addColumn(FAMILY1, QUALIFIER3, value.getBytes());
- return put;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRowCountEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRowCountEndpoint.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRowCountEndpoint.java
deleted file mode 100644
index 1776ced..0000000
--- a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRowCountEndpoint.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.coprocessor.example;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
-import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
-import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.experimental.categories.Category;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map;
-
-import static junit.framework.Assert.*;
-
-/**
- * Test case demonstrating client interactions with the {@link RowCountEndpoint}
- * sample coprocessor Service implementation.
- */
-@Category({CoprocessorTests.class, MediumTests.class})
-public class TestRowCountEndpoint {
- private static final TableName TEST_TABLE = TableName.valueOf("testrowcounter");
- private static final byte[] TEST_FAMILY = Bytes.toBytes("f");
- private static final byte[] TEST_COLUMN = Bytes.toBytes("col");
-
- private static HBaseTestingUtility TEST_UTIL = null;
- private static Configuration CONF = null;
-
- // @Ignore @BeforeClass
- public static void setupBeforeClass() throws Exception {
- TEST_UTIL = new HBaseTestingUtility();
- CONF = TEST_UTIL.getConfiguration();
- CONF.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
- RowCountEndpoint.class.getName());
-
- TEST_UTIL.startMiniCluster();
- TEST_UTIL.createTable(TEST_TABLE, new byte[][]{TEST_FAMILY});
- }
-
- // @Ignore @AfterClass
- public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
- }
-
- // @Ignore @Test
- public void testEndpoint() throws Throwable {
- Table table = TEST_UTIL.getConnection().getTable(TEST_TABLE);
-
- // insert some test rows
- for (int i=0; i<5; i++) {
- byte[] iBytes = Bytes.toBytes(i);
- Put p = new Put(iBytes);
- p.addColumn(TEST_FAMILY, TEST_COLUMN, iBytes);
- table.put(p);
- }
-
- final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance();
- Map<byte[],Long> results = table.coprocessorService(ExampleProtos.RowCountService.class,
- null, null,
- new Batch.Call<ExampleProtos.RowCountService,Long>() {
- public Long call(ExampleProtos.RowCountService counter) throws IOException {
- ServerRpcController controller = new ServerRpcController();
- BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback =
- new BlockingRpcCallback<ExampleProtos.CountResponse>();
- counter.getRowCount(controller, request, rpcCallback);
- ExampleProtos.CountResponse response = rpcCallback.get();
- if (controller.failedOnException()) {
- throw controller.getFailedOn();
- }
- return (response != null && response.hasCount()) ? response.getCount() : 0;
- }
- });
- // should be one region with results
- assertEquals(1, results.size());
- Iterator<Long> iter = results.values().iterator();
- Long val = iter.next();
- assertNotNull(val);
- assertEquals(5l, val.longValue());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java
deleted file mode 100644
index e97d528..0000000
--- a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.hbase.coprocessor.example;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.junit.experimental.categories.Category;
-
-@Category({CoprocessorTests.class, MediumTests.class})
-public class TestZooKeeperScanPolicyObserver {
- private static final Log LOG = LogFactory.getLog(TestZooKeeperScanPolicyObserver.class);
- private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static final byte[] F = Bytes.toBytes("fam");
- private static final byte[] Q = Bytes.toBytes("qual");
- private static final byte[] R = Bytes.toBytes("row");
-
- // @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- System.out.println("HERE!!!!!!!!");
- // Test we can first start the ZK cluster by itself
- Configuration conf = TEST_UTIL.getConfiguration();
- conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
- ZooKeeperScanPolicyObserver.class.getName());
- TEST_UTIL.startMiniZKCluster();
- TEST_UTIL.startMiniCluster();
- }
-
- // @AfterClass
- public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
- }
-
- // @Ignore @Test
- public void testScanPolicyObserver() throws Exception {
- TableName tableName =
- TableName.valueOf("testScanPolicyObserver");
- HTableDescriptor desc = new HTableDescriptor(tableName);
- HColumnDescriptor hcd = new HColumnDescriptor(F)
- .setMaxVersions(10)
- .setTimeToLive(1);
- desc.addFamily(hcd);
- TEST_UTIL.getHBaseAdmin().createTable(desc);
- Table t = TEST_UTIL.getConnection().getTable(tableName);
- long now = EnvironmentEdgeManager.currentTime();
-
- ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "test", null);
- ZooKeeper zk = zkw.getRecoverableZooKeeper().getZooKeeper();
- ZKUtil.createWithParents(zkw, ZooKeeperScanPolicyObserver.node);
- // let's say test last backup was 1h ago
- // using plain ZK here, because RecoverableZooKeeper add extra encoding to the data
- zk.setData(ZooKeeperScanPolicyObserver.node, Bytes.toBytes(now - 3600*1000), -1);
-
- LOG.debug("Set time: "+Bytes.toLong(Bytes.toBytes(now - 3600*1000)));
-
- // sleep for 1s to give the ZK change a chance to reach the watcher in the observer.
- // TODO: Better to wait for the data to be propagated
- Thread.sleep(1000);
-
- long ts = now - 2000;
- Put p = new Put(R);
- p.addColumn(F, Q, ts, Q);
- t.put(p);
- p = new Put(R);
- p.addColumn(F, Q, ts + 1, Q);
- t.put(p);
-
- // these two should be expired but for the override
- // (their ts was 2s in the past)
- Get g = new Get(R);
- g.setMaxVersions(10);
- Result r = t.get(g);
- // still there?
- assertEquals(2, r.size());
-
- TEST_UTIL.flush(tableName);
- TEST_UTIL.compact(tableName, true);
-
- g = new Get(R);
- g.setMaxVersions(10);
- r = t.get(g);
- // still there?
- assertEquals(2, r.size());
- zk.setData(ZooKeeperScanPolicyObserver.node, Bytes.toBytes(now), -1);
- LOG.debug("Set time: "+now);
-
- TEST_UTIL.compact(tableName, true);
-
- g = new Get(R);
- g.setMaxVersions(10);
- r = t.get(g);
- // should be gone now
- assertEquals(0, r.size());
- t.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java
index c83c5e4..16f1e71 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/DistributedHBaseCluster.java
@@ -32,11 +32,11 @@ import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ServerInfo;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMetaReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMetaReplicas.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMetaReplicas.java
index a237805..b53d5d0 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMetaReplicas.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestMetaReplicas.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.TestMetaWithReplicas;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -66,7 +67,7 @@ public class IntegrationTestMetaReplicas {
conf.get("zookeeper.znode.metaserver", "meta-region-server"));
// check that the data in the znode is parseable (this would also mean the znode exists)
byte[] data = ZKUtil.getData(zkw, primaryMetaZnode);
- ServerName.parseFrom(data);
+ ProtobufUtil.toServerName(data);
waitUntilZnodeAvailable(1);
waitUntilZnodeAvailable(2);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
index f41efc7..7ce86bd 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/ipc/IntegrationTestRpcClient.java
@@ -26,9 +26,9 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Lists;
-import com.google.protobuf.BlockingService;
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -48,9 +48,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.codec.Codec;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
-import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.Pair;
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-procedure/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-procedure/pom.xml b/hbase-procedure/pom.xml
index 9d1ac8d..1a813b8 100644
--- a/hbase-procedure/pom.xml
+++ b/hbase-procedure/pom.xml
@@ -88,7 +88,7 @@
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
- <artifactId>hbase-protocol</artifactId>
+ <artifactId>hbase-protocol-shaded</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
http://git-wip-us.apache.org/repos/asf/hbase/blob/95c1dc93/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index 6403cfd..84328a7 100644
--- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -29,20 +29,18 @@ import java.util.Map;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ProcedureInfo;
-import org.apache.hadoop.hbase.ProcedureUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
-import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState;
-import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.NonceKey;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
/**
* Base Procedure class responsible to handle the Procedure Metadata
@@ -774,22 +772,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
}
/**
- * Helper to create the ProcedureInfo from Procedure.
- */
- @InterfaceAudience.Private
- public static ProcedureInfo createProcedureInfo(final Procedure proc, final NonceKey nonceKey) {
- RemoteProcedureException exception = proc.hasException() ? proc.getException() : null;
- return new ProcedureInfo(proc.getProcId(), proc.toStringClass(), proc.getOwner(),
- ProcedureUtil.convertToProcedureState(proc.getState()),
- proc.hasParent() ? proc.getParentProcId() : -1, nonceKey,
- exception != null
- ? new ProcedureUtil.ForeignExceptionMsg(
- RemoteProcedureException.toProto(exception.getSource(), exception.getCause()))
- : null,
- proc.getLastUpdate(), proc.getStartTime(), proc.getResult());
- }
-
- /**
* Helper to convert the procedure to protobuf.
* Used by ProcedureStore implementations.
*/
@@ -833,7 +815,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
byte[] result = proc.getResult();
if (result != null) {
- builder.setResult(ByteStringer.wrap(result));
+ builder.setResult(ByteString.copyFrom(result));
}
ByteString.Output stateStream = ByteString.newOutput();