You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/05/14 02:33:29 UTC

svn commit: r1594428 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/coprocessor/endpoints/ main/java/org/apache/hadoop/hbase/util/ test/java/org/apache/hadoop/hbase/coprocessor/endpoints/ t...

Author: liyin
Date: Wed May 14 00:33:29 2014
New Revision: 1594428

URL: http://svn.apache.org/r1594428
Log:
[master] Support startRow and stopRow in endpoint.

Author: daviddeng

Summary:
Checking `startRow` and `stopRow` before calling.
Add new case for this.

Test Plan: `TestLongAggregator`

Reviewers: adela

Reviewed By: adela

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D1308056

Task ID: 4223215

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestHBCpp.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1594428&r1=1594427&r2=1594428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Wed May 14 00:33:29 2014
@@ -19,20 +19,20 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
-
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+
 /**
  * Used to communicate with a single HBase table.
  *
  * @since 0.21.0
  */
-public interface HTableInterface {
+public interface HTableInterface extends AutoCloseable {
 
   /**
    * Gets the name of this table.
@@ -304,6 +304,7 @@ public interface HTableInterface {
    *
    * @throws IOException if a remote or network exception occurs.
    */
+  @Override
   void close() throws IOException;
 
   /**

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java?rev=1594428&r1=1594427&r2=1594428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java Wed May 14 00:33:29 2014
@@ -19,6 +19,7 @@
  */
 package org.apache.hadoop.hbase.coprocessor.endpoints;
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -27,6 +28,7 @@ import org.apache.hadoop.hbase.coprocess
 import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
 import org.apache.hadoop.hbase.regionserver.HRegionIf;
 import org.apache.hadoop.hbase.regionserver.HRegionServerIf;
+import org.apache.hadoop.hbase.util.ExceptionUtils;
 
 /**
  * An endpoint server.
@@ -80,6 +82,12 @@ public class EndpointServer implements I
       // Invoke the specified method with parameters, the return value is
       // encoded and returned.
       return ent.invoke(ep, methodName, params);
+    } catch (InvocationTargetException e) {
+      Throwable target = e.getTargetException();
+      if (target instanceof Exception) {
+        throw new ThriftHBaseException((Exception) target);
+      }
+      throw ExceptionUtils.toRuntimeException(target);
     } catch (Exception e) {
       throw new ThriftHBaseException(e);
     }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java?rev=1594428&r1=1594427&r2=1594428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java Wed May 14 00:33:29 2014
@@ -28,13 +28,13 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.NavigableMap;
 
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.ServerCallable;
 import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ExceptionUtils;
 
 /**
@@ -97,10 +97,14 @@ public class HTableEndpointClient implem
 
     try {
       for (final HRegionInfo region : regions.keySet()) {
-        // TODO compute startRow and stopRow
-        T ep =
-            getEndpointProxy(clazz, region, HConstants.EMPTY_BYTE_ARRAY,
-                HConstants.EMPTY_BYTE_ARRAY);
+        if (!Bytes.rangesOverlapped(startRow, stopRow, region.getStartKey(),
+            region.getEndKey())) {
+          // no overlap, skipped.
+          continue;
+        }
+
+        T ep = getEndpointProxy(clazz, region, Bytes.nonNull(startRow),
+            Bytes.nonNull(stopRow));
         results.put(region, caller.call(ep));
       }
     } catch (UndeclaredThrowableException e) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java?rev=1594428&r1=1594427&r2=1594428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/LongAggregator.java Wed May 14 00:33:29 2014
@@ -50,9 +50,13 @@ public class LongAggregator implements I
   }
 
   private HRegionIf region;
+  private byte[] startRow;
+  private byte[] stopRow;
   @Override
   public void setContext(IEndpointContext context) throws IOException {
     this.region = context.getRegion();
+    this.startRow = context.getStartRow();
+    this.stopRow = context.getStopRow();
   }
 
   private interface IUpdater {
@@ -78,6 +82,8 @@ public class LongAggregator implements I
         builder.addFamily(family);
       }
     }
+    builder.setStartRow(startRow);
+    builder.setStopRow(stopRow);
 
     try (InternalScanner scanner = this.region.getScanner(builder.create())) {
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java?rev=1594428&r1=1594427&r2=1594428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Bytes.java Wed May 14 00:33:29 2014
@@ -1963,4 +1963,78 @@ public class Bytes {
     }
     return len;
   }
+
+  /**
+   * Converts a null byte[] into empty byte array, no change for other things.
+   */
+  public static byte[] nonNull(byte[] bytes) {
+    if (bytes == null) {
+      return HConstants.EMPTY_BYTE_ARRAY;
+    }
+
+    return bytes;
+  }
+
+  /**
+   * Returns whether the range defined by the start and end rows is empty.
+   *
+   * @param start the start bound, inclusive. null or zero-length array means
+   *          no bound.
+   * @param end the end bound, exclusive. null or zero-length array means
+   *          no bound.
+   */
+  public static boolean rangeNotEmpty(byte[] start, byte[] end) {
+    if (start == null || start.length == 0) {
+      return true;
+    }
+
+    if (end == null || end.length == 0) {
+      return true;
+    }
+
+    return compareTo(start, end) < 0;
+  }
+
+  /**
+   * Returns whether two ranges defined by start/end rows contain non-empty
+   * overlap.
+   */
+  public static boolean rangesOverlapped(byte[] start1, byte[] end1,
+      byte[] start2, byte[] end2) {
+    Pair<byte[], byte[]> startEnd = rangeIntersect(start1, end1, start2, end2);
+
+    return rangeNotEmpty(startEnd.getFirst(), startEnd.getSecond());
+  }
+
+  /**
+   * Returns the intersect of two regions.
+   *
+   * @return Pair of start and end keys in order.
+   */
+  public static Pair<byte[], byte[]> rangeIntersect(byte[] start1,
+      byte[] end1, byte[] start2, byte[] end2) {
+    byte[] start = null;
+    if (start1 == null || start1.length == 0) {
+      start = start2;
+    } else if (start2 == null || start2.length == 0) {
+      start = start1;
+    } else if (compareTo(start1, start2) > 0) {
+      start = start1;
+    } else {
+      start = start2;
+    }
+
+    byte[] end = null;
+    if (end1 == null || end1.length == 0) {
+      end = end2;
+    } else if (end2 == null || end2.length == 0) {
+      end = end1;
+    } else if (compareTo(end1, end2) < 0) {
+      end = end1;
+    } else {
+      end = end2;
+    }
+
+    return new Pair<>(start, end);
+  }
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java?rev=1594428&r1=1594427&r2=1594428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestLongAggregator.java Wed May 14 00:33:29 2014
@@ -30,9 +30,9 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointClient.Caller;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.StringBytes;
-import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -46,16 +46,16 @@ public class TestLongAggregator {
   private static final byte[] FAMILY_NAME = Bytes.toBytes("f");
   private static final byte[] QUALITY_NAME = Bytes.toBytes("q");
 
-  @Before
-  public void setUp() throws Exception {
+  @BeforeClass
+  public static void setUp() throws Exception {
     TEST_UTIL.getConfiguration().setStrings(EndpointLoader.FACTORY_CLASSES_KEY,
         LongAggregator.Factory.class.getName());
 
     TEST_UTIL.startMiniCluster();
   }
 
-  @After
-  public void tearDown() throws Exception {
+  @AfterClass
+  public static void tearDown() throws Exception {
     TEST_UTIL.shutdownMiniCluster();
   }
 
@@ -129,4 +129,76 @@ public class TestLongAggregator {
     // Check the final results
     Assert.assertEquals("min", 1, min);
   }
+
+  @Test
+  public void testCallWithRange() throws Exception {
+    final StringBytes TABLE_NAME = new StringBytes("testCallWithRange");
+    final byte[] PREFIX = Bytes.toBytes("fb");
+
+    // Create the table
+    try (HTableInterface table = TEST_UTIL.createTable(TABLE_NAME, FAMILY_NAME)) {
+      // Put some values
+      for (int i = 1; i <= 9; i++) {
+        table.put(new Put(Bytes.toBytes("row" + i)).add(FAMILY_NAME,
+            QUALITY_NAME, Bytes.add(PREFIX, Bytes.toBytes((long) i))));
+      }
+
+      // Calling endpoints.
+      IEndpointClient cp = (IEndpointClient) table;
+      Map<HRegionInfo, Long> results =
+          cp.coprocessorEndpoint(ILongAggregator.class,
+              Bytes.toBytes("row" + 2), Bytes.toBytes("row" + 8),
+              new Caller<ILongAggregator, Long>() {
+                @Override
+                public Long call(ILongAggregator client) throws IOException {
+                  return client.sum(FAMILY_NAME, null, PREFIX.length);
+                }
+              });
+
+      // Aggregates results from all regions
+      long sum = 0;
+      for (Long res : results.values()) {
+        sum += res;
+      }
+
+      // Check the final results
+      Assert.assertEquals("sum", 27, sum);
+
+      results = cp.coprocessorEndpoint(ILongAggregator.class,
+          Bytes.toBytes("row" + 2), Bytes.toBytes("row" + 8),
+          new Caller<ILongAggregator, Long>() {
+            @Override
+            public Long call(ILongAggregator client) throws IOException {
+              return client.max(FAMILY_NAME, null, PREFIX.length);
+            }
+          });
+
+      // Aggregates results from all regions
+      long max = Long.MIN_VALUE;
+      for (Long res : results.values()) {
+        max = Math.max(max, res);
+      }
+
+      // Check the final results
+      Assert.assertEquals("max", 7, max);
+
+      results = cp.coprocessorEndpoint(ILongAggregator.class,
+          Bytes.toBytes("row" + 2), Bytes.toBytes("row" + 8),
+          new Caller<ILongAggregator, Long>() {
+            @Override
+            public Long call(ILongAggregator client) throws IOException {
+              return client.min(FAMILY_NAME, null, PREFIX.length);
+            }
+          });
+
+      // Aggregates results from all regions
+      long min = Long.MAX_VALUE;
+      for (Long res : results.values()) {
+        min = Math.min(min, res);
+      }
+
+      // Check the final results
+      Assert.assertEquals("min", 2, min);
+    }
+  }
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestHBCpp.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestHBCpp.java?rev=1594428&r1=1594427&r2=1594428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestHBCpp.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/thrift/TestHBCpp.java Wed May 14 00:33:29 2014
@@ -29,6 +29,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointLoader;
+import org.apache.hadoop.hbase.coprocessor.endpoints.LongAggregator;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -64,6 +66,9 @@ public class TestHBCpp {
     TEST_UTIL.getConfiguration().set(HBaseTestingUtility.FS_TYPE_KEY,
         HBaseTestingUtility.FS_TYPE_LFS);
 
+    TEST_UTIL.getConfiguration().setStrings(EndpointLoader.FACTORY_CLASSES_KEY,
+        LongAggregator.Factory.class.getName());
+
     TEST_UTIL.startMiniCluster();
     // create the table as SimpleClient assumes.
     byte[] tableName = Bytes.toBytes("t1");

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java?rev=1594428&r1=1594427&r2=1594428&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/TestBytes.java Wed May 14 00:33:29 2014
@@ -343,4 +343,12 @@ public class TestBytes extends TestCase 
       }
     }
   }
+
+  public void testNonNull() throws Exception {
+    Assert.assertArrayEquals("nonNull(null)", new byte[0], Bytes.nonNull(null));
+    Assert.assertArrayEquals("nonNull([])", new byte[0],
+        Bytes.nonNull(new byte[0]));
+    Assert.assertArrayEquals("nonNull([1])", new byte[] { 1 },
+        Bytes.nonNull(new byte[] { 1 }));
+  }
 }