You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/05/14 02:33:29 UTC
svn commit: r1594428 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/coprocessor/endpoints/
main/java/org/apache/hadoop/hbase/util/
test/java/org/apache/hadoop/hbase/coprocessor/endpoints/ t...
Author: liyin
Date: Wed May 14 00:33:29 2014
New Revision: 1594428
URL: http://svn.apache.org/r1594428
Log:
[master] Support startRow and stopRow in endpoint.
Author: daviddeng
Summary:
Checking `startRow` and `stopRow` before calling.
Add new case for this.
Test Plan: `TestLongAggregator`
Reviewers: adela
Reviewed By: adela
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D1308056
Task ID: 4223215
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestHBCpp.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1594428&r1=1594427&r2=1594428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Wed May 14 00:33:29 2014
@@ -19,20 +19,20 @@
*/
package org.apache.hadoop.hbase.client;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
-
import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+
/**
* Used to communicate with a single HBase table.
*
* @since 0.21.0
*/
-public interface HTableInterface {
+public interface HTableInterface extends AutoCloseable {
/**
* Gets the name of this table.
@@ -304,6 +304,7 @@ public interface HTableInterface {
*
* @throws IOException if a remote or network exception occurs.
*/
+ @Override
void close() throws IOException;
/**
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java?rev=1594428&r1=1594427&r2=1594428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java Wed May 14 00:33:29 2014
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.coprocessor.endpoints;
+import java.lang.reflect.InvocationTargetException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.coprocess
import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
import org.apache.hadoop.hbase.regionserver.HRegionIf;
import org.apache.hadoop.hbase.regionserver.HRegionServerIf;
+import org.apache.hadoop.hbase.util.ExceptionUtils;
/**
* An endpoint server.
@@ -80,6 +82,12 @@ public class EndpointServer implements I
// Invoke the specified method with parameters, the return value is
// encoded and returned.
return ent.invoke(ep, methodName, params);
+ } catch (InvocationTargetException e) {
+ Throwable target = e.getTargetException();
+ if (target instanceof Exception) {
+ throw new ThriftHBaseException((Exception) target);
+ }
+ throw ExceptionUtils.toRuntimeException(target);
} catch (Exception e) {
throw new ThriftHBaseException(e);
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java?rev=1594428&r1=1594427&r2=1594428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java Wed May 14 00:33:29 2014
@@ -28,13 +28,13 @@ import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ExceptionUtils;
/**
@@ -97,10 +97,14 @@ public class HTableEndpointClient implem
try {
for (final HRegionInfo region : regions.keySet()) {
- // TODO compute startRow and stopRow
- T ep =
- getEndpointProxy(clazz, region, HConstants.EMPTY_BYTE_ARRAY,
- HConstants.EMPTY_BYTE_ARRAY);
+ if (!Bytes.rangesOverlapped(startRow, stopRow, region.getStartKey(),
+ region.getEndKey())) {
+ // no overlap, skipped.
+ continue;
+ }
+
+ T ep = getEndpointProxy(clazz, region, Bytes.nonNull(startRow),
+ Bytes.nonNull(stopRow));
results.put(region, caller.call(ep));
}
} catch (UndeclaredThrowableException e) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java?rev=1594428&r1=1594427&r2=1594428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java Wed May 14 00:33:29 2014
@@ -50,9 +50,13 @@ public class LongAggregator implements I
}
private HRegionIf region;
+ private byte[] startRow;
+ private byte[] stopRow;
@Override
public void setContext(IEndpointContext context) throws IOException {
this.region = context.getRegion();
+ this.startRow = context.getStartRow();
+ this.stopRow = context.getStopRow();
}
private interface IUpdater {
@@ -78,6 +82,8 @@ public class LongAggregator implements I
builder.addFamily(family);
}
}
+ builder.setStartRow(startRow);
+ builder.setStopRow(stopRow);
try (InternalScanner scanner = this.region.getScanner(builder.create())) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java?rev=1594428&r1=1594427&r2=1594428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java Wed May 14 00:33:29 2014
@@ -1963,4 +1963,78 @@ public class Bytes {
}
return len;
}
+
+ /**
+ * Converts a null byte[] into empty byte array, no change for other things.
+ */
+ public static byte[] nonNull(byte[] bytes) {
+ if (bytes == null) {
+ return HConstants.EMPTY_BYTE_ARRAY;
+ }
+
+ return bytes;
+ }
+
+ /**
+ * Returns whether the range defined by the start and end rows is empty.
+ *
+ * @param start the start bound, inclusive. null or zero-length array means
+ * no bound.
+ * @param end the end bound, exclusive. null or zero-length array means
+ * no bound.
+ */
+ public static boolean rangeNotEmpty(byte[] start, byte[] end) {
+ if (start == null || start.length == 0) {
+ return true;
+ }
+
+ if (end == null || end.length == 0) {
+ return true;
+ }
+
+ return compareTo(start, end) < 0;
+ }
+
+ /**
+ * Returns whether two ranges defined by start/end rows contain non-empty
+ * overlap.
+ */
+ public static boolean rangesOverlapped(byte[] start1, byte[] end1,
+ byte[] start2, byte[] end2) {
+ Pair<byte[], byte[]> startEnd = rangeIntersect(start1, end1, start2, end2);
+
+ return rangeNotEmpty(startEnd.getFirst(), startEnd.getSecond());
+ }
+
+ /**
+ * Returns the intersect of two regions.
+ *
+ * @return Pair of start and end keys in order.
+ */
+ public static Pair<byte[], byte[]> rangeIntersect(byte[] start1,
+ byte[] end1, byte[] start2, byte[] end2) {
+ byte[] start = null;
+ if (start1 == null || start1.length == 0) {
+ start = start2;
+ } else if (start2 == null || start2.length == 0) {
+ start = start1;
+ } else if (compareTo(start1, start2) > 0) {
+ start = start1;
+ } else {
+ start = start2;
+ }
+
+ byte[] end = null;
+ if (end1 == null || end1.length == 0) {
+ end = end2;
+ } else if (end2 == null || end2.length == 0) {
+ end = end1;
+ } else if (compareTo(end1, end2) < 0) {
+ end = end1;
+ } else {
+ end = end2;
+ }
+
+ return new Pair<>(start, end);
+ }
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java?rev=1594428&r1=1594427&r2=1594428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java Wed May 14 00:33:29 2014
@@ -30,9 +30,9 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointClient.Caller;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.StringBytes;
-import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
-import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -46,16 +46,16 @@ public class TestLongAggregator {
private static final byte[] FAMILY_NAME = Bytes.toBytes("f");
private static final byte[] QUALITY_NAME = Bytes.toBytes("q");
- @Before
- public void setUp() throws Exception {
+ @BeforeClass
+ public static void setUp() throws Exception {
TEST_UTIL.getConfiguration().setStrings(EndpointLoader.FACTORY_CLASSES_KEY,
LongAggregator.Factory.class.getName());
TEST_UTIL.startMiniCluster();
}
- @After
- public void tearDown() throws Exception {
+ @AfterClass
+ public static void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@@ -129,4 +129,76 @@ public class TestLongAggregator {
// Check the final results
Assert.assertEquals("min", 1, min);
}
+
+ @Test
+ public void testCallWithRange() throws Exception {
+ final StringBytes TABLE_NAME = new StringBytes("testCallWithRange");
+ final byte[] PREFIX = Bytes.toBytes("fb");
+
+ // Create the table
+ try (HTableInterface table = TEST_UTIL.createTable(TABLE_NAME, FAMILY_NAME)) {
+ // Put some values
+ for (int i = 1; i <= 9; i++) {
+ table.put(new Put(Bytes.toBytes("row" + i)).add(FAMILY_NAME,
+ QUALITY_NAME, Bytes.add(PREFIX, Bytes.toBytes((long) i))));
+ }
+
+ // Calling endpoints.
+ IEndpointClient cp = (IEndpointClient) table;
+ Map<HRegionInfo, Long> results =
+ cp.coprocessorEndpoint(ILongAggregator.class,
+ Bytes.toBytes("row" + 2), Bytes.toBytes("row" + 8),
+ new Caller<ILongAggregator, Long>() {
+ @Override
+ public Long call(ILongAggregator client) throws IOException {
+ return client.sum(FAMILY_NAME, null, PREFIX.length);
+ }
+ });
+
+ // Aggregates results from all regions
+ long sum = 0;
+ for (Long res : results.values()) {
+ sum += res;
+ }
+
+ // Check the final results
+ Assert.assertEquals("sum", 27, sum);
+
+ results = cp.coprocessorEndpoint(ILongAggregator.class,
+ Bytes.toBytes("row" + 2), Bytes.toBytes("row" + 8),
+ new Caller<ILongAggregator, Long>() {
+ @Override
+ public Long call(ILongAggregator client) throws IOException {
+ return client.max(FAMILY_NAME, null, PREFIX.length);
+ }
+ });
+
+ // Aggregates results from all regions
+ long max = Long.MIN_VALUE;
+ for (Long res : results.values()) {
+ max = Math.max(max, res);
+ }
+
+ // Check the final results
+ Assert.assertEquals("max", 7, max);
+
+ results = cp.coprocessorEndpoint(ILongAggregator.class,
+ Bytes.toBytes("row" + 2), Bytes.toBytes("row" + 8),
+ new Caller<ILongAggregator, Long>() {
+ @Override
+ public Long call(ILongAggregator client) throws IOException {
+ return client.min(FAMILY_NAME, null, PREFIX.length);
+ }
+ });
+
+ // Aggregates results from all regions
+ long min = Long.MAX_VALUE;
+ for (Long res : results.values()) {
+ min = Math.min(min, res);
+ }
+
+ // Check the final results
+ Assert.assertEquals("min", 2, min);
+ }
+ }
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestHBCpp.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestHBCpp.java?rev=1594428&r1=1594427&r2=1594428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestHBCpp.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestHBCpp.java Wed May 14 00:33:29 2014
@@ -29,6 +29,8 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointLoader;
+import org.apache.hadoop.hbase.coprocessor.endpoints.LongAggregator;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -64,6 +66,9 @@ public class TestHBCpp {
TEST_UTIL.getConfiguration().set(HBaseTestingUtility.FS_TYPE_KEY,
HBaseTestingUtility.FS_TYPE_LFS);
+ TEST_UTIL.getConfiguration().setStrings(EndpointLoader.FACTORY_CLASSES_KEY,
+ LongAggregator.Factory.class.getName());
+
TEST_UTIL.startMiniCluster();
// create the table as SimpleClient assumes.
byte[] tableName = Bytes.toBytes("t1");
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java?rev=1594428&r1=1594427&r2=1594428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java Wed May 14 00:33:29 2014
@@ -343,4 +343,12 @@ public class TestBytes extends TestCase
}
}
}
+
+ public void testNonNull() throws Exception {
+ Assert.assertArrayEquals("nonNull(null)", new byte[0], Bytes.nonNull(null));
+ Assert.assertArrayEquals("nonNull([])", new byte[0],
+ Bytes.nonNull(new byte[0]));
+ Assert.assertArrayEquals("nonNull([1])", new byte[] { 1 },
+ Bytes.nonNull(new byte[] { 1 }));
+ }
}