You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2015/02/26 03:46:40 UTC
[4/5] hbase git commit: HBASE-13106 Ensure endpoint-only table
coprocessors can be dynamically loaded
HBASE-13106 Ensure endpoint-only table coprocessors can be dynamically loaded
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a16603f1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a16603f1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a16603f1
Branch: refs/heads/branch-1.0
Commit: a16603f18942804abe040b4f13f27584b5e72863
Parents: 92ba3ed
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Feb 25 16:37:46 2015 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Feb 25 18:18:03 2015 -0800
----------------------------------------------------------------------
.../coprocessor/TestCoprocessorEndpoint.java | 9 +-
.../TestCoprocessorTableEndpoint.java | 181 +++++++++++++++++++
2 files changed, 185 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/a16603f1/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
index 2bc08d8..166c7e8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
@@ -71,7 +70,7 @@ public class TestCoprocessorEndpoint {
private static final Log LOG = LogFactory.getLog(TestCoprocessorEndpoint.class);
private static final TableName TEST_TABLE =
- TableName.valueOf("TestTable");
+ TableName.valueOf("TestCoprocessorEndpoint");
private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
private static byte[] ROW = Bytes.toBytes("testRow");
@@ -93,17 +92,17 @@ public class TestCoprocessorEndpoint {
conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
ProtobufCoprocessorService.class.getName());
util.startMiniCluster(2);
- Admin admin = new HBaseAdmin(conf);
+
+ Admin admin = util.getHBaseAdmin();
HTableDescriptor desc = new HTableDescriptor(TEST_TABLE);
desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
util.waitUntilAllRegionsAssigned(TEST_TABLE);
- admin.close();
Table table = new HTable(conf, TEST_TABLE);
for (int i = 0; i < ROWSIZE; i++) {
Put put = new Put(ROWS[i]);
- put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
+ put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
table.put(put);
}
table.close();
http://git-wip-us.apache.org/repos/asf/hbase/blob/a16603f1/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java
new file mode 100644
index 0000000..1e22c83
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorTableEndpoint.java
@@ -0,0 +1,181 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.ByteStringer;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.protobuf.ServiceException;
+
+@Category(MediumTests.class)
+public class TestCoprocessorTableEndpoint {
+
+ private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
+ private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
+ private static final byte[] ROW = Bytes.toBytes("testRow");
+ private static final int ROWSIZE = 20;
+ private static final int rowSeperator1 = 5;
+ private static final int rowSeperator2 = 12;
+ private static final byte[][] ROWS = makeN(ROW, ROWSIZE);
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster(2);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testCoprocessorTableEndpoint() throws Throwable {
+ final TableName tableName = TableName.valueOf("testCoprocessorTableEndpoint");
+
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
+ desc.addCoprocessor(org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName());
+
+ createTable(desc);
+ verifyTable(tableName);
+ }
+
+ @Test
+ public void testDynamicCoprocessorTableEndpoint() throws Throwable {
+ final TableName tableName = TableName.valueOf("testDynamicCoprocessorTableEndpoint");
+
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(new HColumnDescriptor(TEST_FAMILY));
+
+ createTable(desc);
+
+ desc.addCoprocessor(org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint.class.getName());
+ updateTable(desc);
+
+ verifyTable(tableName);
+ }
+
+ private static byte[][] makeN(byte[] base, int n) {
+ byte[][] ret = new byte[n][];
+ for (int i = 0; i < n; i++) {
+ ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
+ }
+ return ret;
+ }
+
+ private static Map<byte [], Long> sum(final Table table, final byte [] family,
+ final byte [] qualifier, final byte [] start, final byte [] end)
+ throws ServiceException, Throwable {
+ return table.coprocessorService(ColumnAggregationProtos.ColumnAggregationService.class,
+ start, end,
+ new Batch.Call<ColumnAggregationProtos.ColumnAggregationService, Long>() {
+ @Override
+ public Long call(ColumnAggregationProtos.ColumnAggregationService instance)
+ throws IOException {
+ BlockingRpcCallback<ColumnAggregationProtos.SumResponse> rpcCallback =
+ new BlockingRpcCallback<ColumnAggregationProtos.SumResponse>();
+ ColumnAggregationProtos.SumRequest.Builder builder =
+ ColumnAggregationProtos.SumRequest.newBuilder();
+ builder.setFamily(ByteStringer.wrap(family));
+ if (qualifier != null && qualifier.length > 0) {
+ builder.setQualifier(ByteStringer.wrap(qualifier));
+ }
+ instance.sum(null, builder.build(), rpcCallback);
+ return rpcCallback.get().getSum();
+ }
+ });
+ }
+
+ private static final void createTable(HTableDescriptor desc) throws Exception {
+ Admin admin = TEST_UTIL.getHBaseAdmin();
+ admin.createTable(desc, new byte[][]{ROWS[rowSeperator1], ROWS[rowSeperator2]});
+ TEST_UTIL.waitUntilAllRegionsAssigned(desc.getTableName());
+ Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
+ try {
+ for (int i = 0; i < ROWSIZE; i++) {
+ Put put = new Put(ROWS[i]);
+ put.addColumn(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
+ table.put(put);
+ }
+ } finally {
+ table.close();
+ }
+ }
+
+ private static void updateTable(HTableDescriptor desc) throws Exception {
+ Admin admin = TEST_UTIL.getHBaseAdmin();
+ admin.disableTable(desc.getTableName());
+ admin.modifyTable(desc.getTableName(), desc);
+ admin.enableTable(desc.getTableName());
+ }
+
+ private static final void verifyTable(TableName tableName) throws Throwable {
+ Table table = TEST_UTIL.getConnection().getTable(tableName);
+ try {
+ Map<byte[], Long> results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[0],
+ ROWS[ROWS.length-1]);
+ int sumResult = 0;
+ int expectedResult = 0;
+ for (Map.Entry<byte[], Long> e : results.entrySet()) {
+ sumResult += e.getValue();
+ }
+ for (int i = 0; i < ROWSIZE; i++) {
+ expectedResult += i;
+ }
+ assertEquals("Invalid result", expectedResult, sumResult);
+
+ // scan: for region 2 and region 3
+ results.clear();
+ results = sum(table, TEST_FAMILY, TEST_QUALIFIER, ROWS[rowSeperator1], ROWS[ROWS.length-1]);
+ sumResult = 0;
+ expectedResult = 0;
+ for (Map.Entry<byte[], Long> e : results.entrySet()) {
+ sumResult += e.getValue();
+ }
+ for (int i = rowSeperator1; i < ROWSIZE; i++) {
+ expectedResult += i;
+ }
+ assertEquals("Invalid result", expectedResult, sumResult);
+ } finally {
+ table.close();
+ }
+ }
+}