You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ya...@apache.org on 2016/12/02 08:07:50 UTC

hbase git commit: HBASE-16886: hbase-client: scanner with reversed=true and small=true gets no result

Repository: hbase
Updated Branches:
  refs/heads/branch-1.2 5a4972f9c -> c9f388bec


HBASE-16886: hbase-client: scanner with reversed=true and small=true gets no result

Signed-off-by: Phil Yang <ya...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c9f388be
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c9f388be
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c9f388be

Branch: refs/heads/branch-1.2
Commit: c9f388bece8a89ff52baf358df63c8e5fbd7dc36
Parents: 5a4972f
Author: huzheng <op...@gmail.com>
Authored: Wed Oct 26 11:28:27 2016 +0800
Committer: Phil Yang <ya...@apache.org>
Committed: Fri Dec 2 16:02:31 2016 +0800

----------------------------------------------------------------------
 .../client/ClientSmallReversedScanner.java      | 115 ++++++++++++++--
 .../client/TestClientSmallReversedScanner.java  |  16 +--
 .../hbase/client/TestSmallReversedScanner.java  | 138 +++++++++++++++++++
 3 files changed, 248 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c9f388be/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
index d4de6a0..bd5575a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.client;
 
 
+import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -29,13 +30,18 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable;
-import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.ResponseConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.concurrent.ExecutorService;
 
 /**
@@ -49,8 +55,8 @@ import java.util.concurrent.ExecutorService;
 @InterfaceAudience.Private
 public class ClientSmallReversedScanner extends ReversedClientScanner {
   private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
-  private ScannerCallableWithReplicas smallScanCallable = null;
-  private SmallScannerCallableFactory callableFactory;
+  private ScannerCallableWithReplicas smallReversedScanCallable = null;
+  private SmallReversedScannerCallableFactory callableFactory;
 
   /**
    * Create a new ReversibleClientScanner for the specified table. Take note that the passed
@@ -80,7 +86,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
       RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout)
       throws IOException {
     this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
-        primaryOperationTimeout, new SmallScannerCallableFactory());
+        primaryOperationTimeout, new SmallReversedScannerCallableFactory());
   }
 
   /**
@@ -112,7 +118,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
   ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName,
       ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
       RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
-      SmallScannerCallableFactory callableFactory) throws IOException {
+      SmallReversedScannerCallableFactory callableFactory) throws IOException {
     super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool,
         primaryOperationTimeout);
     this.callableFactory = callableFactory;
@@ -136,6 +142,7 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
     byte[] localStartKey;
     int cacheNum = nbRows;
     boolean regionChanged = true;
+    boolean isFirstRegionToLocate = false;
     // if we're at end of table, close and return false to stop iterating
     if (this.currentRegion != null && currentRegionDone) {
       byte[] startKey = this.currentRegion.getStartKey();
@@ -158,6 +165,14 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
       localStartKey = createClosestRowBefore(lastResult.getRow());
     } else {
       localStartKey = this.scan.getStartRow();
+      isFirstRegionToLocate = true;
+    }
+
+    if (!isFirstRegionToLocate
+        && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
+      // when non-firstRegion & localStartKey is empty bytes, no more rowKey should scan.
+      // otherwise, maybe infinity results with RowKey=0x00 will return.
+      return false;
     }
 
     if (LOG.isTraceEnabled()) {
@@ -165,9 +180,10 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
           + Bytes.toStringBinary(localStartKey) + "'");
     }
 
-    smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan,
-        getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(),
-        getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller);
+    smallReversedScanCallable =
+        callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(),
+          localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(),
+          getRetries(), getScannerTimeout(), getConf(), caller, isFirstRegionToLocate);
 
     if (this.scanMetrics != null && regionChanged) {
       this.scanMetrics.countOfRegions.incrementAndGet();
@@ -209,8 +225,8 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
       // exhausted current region.
       // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
       // we do a callWithRetries
-      values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout);
-      this.currentRegion = smallScanCallable.getHRegionInfo();
+      values = this.caller.callWithoutRetries(smallReversedScanCallable, scannerTimeout);
+      this.currentRegion = smallReversedScanCallable.getHRegionInfo();
       long currentTime = System.currentTimeMillis();
       if (this.scanMetrics != null) {
         this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
@@ -229,8 +245,8 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
           this.lastResult = rs;
         }
       }
-      if (smallScanCallable.hasMoreResultsContext()) {
-        currentRegionDone = !smallScanCallable.getServerHasMoreResults();
+      if (smallReversedScanCallable.hasMoreResultsContext()) {
+        currentRegionDone = !smallReversedScanCallable.getServerHasMoreResults();
       } else {
         currentRegionDone = countdown > 0;
       }
@@ -250,7 +266,80 @@ public class ClientSmallReversedScanner extends ReversedClientScanner {
   }
 
   @VisibleForTesting
-  protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) {
+  protected void setScannerCallableFactory(SmallReversedScannerCallableFactory callableFactory) {
     this.callableFactory = callableFactory;
   }
+
+  /**
+   * A reversed ScannerCallable which supports backward small scanning.
+   */
+  static class SmallReversedScannerCallable extends ReversedScannerCallable {
+
+    public SmallReversedScannerCallable(ClusterConnection connection, TableName table, Scan scan,
+        ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory controllerFactory,
+        int caching, int replicaId) {
+      super(connection, table, scan, scanMetrics, locateStartRow, controllerFactory, replicaId);
+      this.setCaching(caching);
+    }
+
+    @Override
+    public Result[] call(int timeout) throws IOException {
+      if (this.closed) return null;
+      if (Thread.interrupted()) {
+        throw new InterruptedIOException();
+      }
+      ClientProtos.ScanRequest request = RequestConverter.buildScanRequest(
+        getLocation().getRegionInfo().getRegionName(), getScan(), getCaching(), true);
+      ClientProtos.ScanResponse response = null;
+      controller = controllerFactory.newController();
+      try {
+        controller.setPriority(getTableName());
+        controller.setCallTimeout(timeout);
+        response = getStub().scan(controller, request);
+        Result[] results = ResponseConverter.getResults(controller.cellScanner(), response);
+        if (response.hasMoreResultsInRegion()) {
+          setHasMoreResultsContext(true);
+          setServerHasMoreResults(response.getMoreResultsInRegion());
+        } else {
+          setHasMoreResultsContext(false);
+        }
+        // We need to update result metrics since we are overriding call()
+        updateResultsMetrics(results);
+        return results;
+      } catch (ServiceException se) {
+        throw ProtobufUtil.getRemoteException(se);
+      }
+    }
+
+    @Override
+    public ScannerCallable getScannerCallableForReplica(int id) {
+      return new SmallReversedScannerCallable(getConnection(), getTableName(), getScan(),
+          scanMetrics, locateStartRow, controllerFactory, getCaching(), id);
+    }
+  }
+
+  protected static class SmallReversedScannerCallableFactory {
+
+    public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
+        Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
+        RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout,
+        int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller,
+        boolean isFirstRegionToLocate) {
+      byte[] locateStartRow = null;
+      if (isFirstRegionToLocate
+          && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) {
+        // HBASE-16886: if not setting startRow, then we will use a range [MAX_BYTE_ARRAY, +oo) to
+        // locate a region list, and the last one in region list is the region where our scan start.
+        locateStartRow = ClientScanner.MAX_BYTE_ARRAY;
+      }
+
+      scan.setStartRow(localStartKey);
+      SmallReversedScannerCallable s = new SmallReversedScannerCallable(connection, table, scan,
+          scanMetrics, locateStartRow, controllerFactory, cacheNum, 0);
+      ScannerCallableWithReplicas scannerCallableWithReplicas =
+          new ScannerCallableWithReplicas(table, connection, s, pool, primaryOperationTimeout, scan,
+              retries, scannerTimeout, cacheNum, conf, caller);
+      return scannerCallableWithReplicas;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9f388be/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java
index 4611d08..57b52e6 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientSmallReversedScanner.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallableFactory;
+import org.apache.hadoop.hbase.client.ClientSmallReversedScanner.SmallReversedScannerCallableFactory;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -102,15 +102,15 @@ public class TestClientSmallReversedScanner {
     };
   }
 
-  private SmallScannerCallableFactory getFactory(
+  private SmallReversedScannerCallableFactory getFactory(
       final ScannerCallableWithReplicas callableWithReplicas) {
-    return new SmallScannerCallableFactory() {
+    return new SmallReversedScannerCallableFactory() {
       @Override
       public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table,
           Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum,
           RpcControllerFactory controllerFactory, ExecutorService pool,
           int primaryOperationTimeout, int retries, int scannerTimeout, Configuration conf,
-          RpcRetryingCaller<Result[]> caller) {
+          RpcRetryingCaller<Result[]> caller, boolean isFirstRegionToLocate) {
         return callableWithReplicas;
       }
     };
@@ -135,7 +135,7 @@ public class TestClientSmallReversedScanner {
     // Intentionally leave a "default" caching size in the Scan. No matter the value, we
     // should continue based on the server context
 
-    SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
+    SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
 
     try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
         TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
@@ -204,7 +204,7 @@ public class TestClientSmallReversedScanner {
     // While the server returns 2 records per batch, we expect more records.
     scan.setCaching(2);
 
-    SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
+    SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
 
     try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
         TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
@@ -280,7 +280,7 @@ public class TestClientSmallReversedScanner {
     // While the server return 2 records per RPC, we expect there to be more records.
     scan.setCaching(2);
 
-    SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
+    SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
 
     try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
         TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,
@@ -316,7 +316,7 @@ public class TestClientSmallReversedScanner {
     ScannerCallableWithReplicas callableWithReplicas = Mockito
         .mock(ScannerCallableWithReplicas.class);
 
-    SmallScannerCallableFactory factory = getFactory(callableWithReplicas);
+    SmallReversedScannerCallableFactory factory = getFactory(callableWithReplicas);
 
     try (ClientSmallReversedScanner csrs = new ClientSmallReversedScanner(conf, scan,
         TableName.valueOf("table"), clusterConn, rpcFactory, controllerFactory, pool,

http://git-wip-us.apache.org/repos/asf/hbase/blob/c9f388be/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSmallReversedScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSmallReversedScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSmallReversedScanner.java
new file mode 100644
index 0000000..3a4e92b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestSmallReversedScanner.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+
+@Category(MediumTests.class)
+public class TestSmallReversedScanner {
+  public static final Log LOG = LogFactory.getLog(TestSmallReversedScanner.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static final TableName TABLE_NAME = TableName.valueOf("testReversedSmall");
+  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("columnFamily");
+
+  private static Table htable = null;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+
+    // create a table with 4 region: (-oo, b),[b,c),[c,d),[d,+oo)
+    byte[] bytes = Bytes.toBytes("bcd");
+    byte[][] splitKeys = new byte[bytes.length][];
+
+    for (int i = 0; i < bytes.length; i++) {
+      splitKeys[i] = new byte[] { bytes[i] };
+    }
+    htable = TEST_UTIL.createTable(TABLE_NAME, COLUMN_FAMILY, splitKeys);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    TEST_UTIL.deleteTableData(TABLE_NAME);
+  }
+
+  /**
+   * all rowKeys are fit in the last region.
+   * @throws IOException
+   */
+  @Test
+  public void testSmallReversedScan01() throws IOException {
+    String[][] keysCases = new String[][] {
+            { "d0", "d1", "d2", "d3" }, // all rowKeys fit in the last region.
+            { "a0", "a1", "a2", "a3" }, // all rowKeys fit in the first region.
+            { "a0", "b1", "c2", "d3" }, // each region with a rowKey
+    };
+
+    for (int caseIndex = 0; caseIndex < keysCases.length; caseIndex++) {
+      testSmallReversedScanInternal(keysCases[caseIndex]);
+      TEST_UTIL.deleteTableData(TABLE_NAME);
+    }
+  }
+
+  private void testSmallReversedScanInternal(String[] inputRowKeys) throws IOException {
+    int rowCount = inputRowKeys.length;
+
+    for (int i = 0; i < rowCount; i++) {
+      Put put = new Put(Bytes.toBytes(inputRowKeys[i]));
+      put.addColumn(COLUMN_FAMILY, null, Bytes.toBytes(i));
+      htable.put(put);
+    }
+
+    Scan scan = new Scan();
+    scan.setReversed(true);
+    scan.setSmall(true);
+
+    ResultScanner scanner = htable.getScanner(scan);
+    Result r;
+    int value = rowCount;
+    while ((r = scanner.next()) != null) {
+      Assert.assertArrayEquals(r.getValue(COLUMN_FAMILY, null), Bytes.toBytes(--value));
+      Assert.assertArrayEquals(r.getRow(), Bytes.toBytes(inputRowKeys[value]));
+    }
+
+    Assert.assertEquals(value, 0);
+  }
+
+  /**
+   * Corner case:
+   *  HBase has 4 regions, (-oo,b),[b,c),[c,d),[d,+oo), and only rowKey with byte[]={0x00} locate in region (-oo,b) .
+   *  test whether reversed small scanner will return infinity results with RowKey={0x00}.
+   * @throws IOException
+   */
+  @Test
+  public void testSmallReversedScan02() throws IOException {
+    Put put = new Put(new byte[] { (char) 0x00 });
+    put.addColumn(COLUMN_FAMILY, null, Bytes.toBytes(0));
+    htable.put(put);
+
+    Scan scan = new Scan();
+    scan.setCaching(1);
+    scan.setReversed(true);
+    scan.setSmall(true);
+
+    ResultScanner scanner = htable.getScanner(scan);
+    Result r;
+    int count = 1;
+    while ((r = scanner.next()) != null) {
+      Assert.assertArrayEquals(r.getValue(COLUMN_FAMILY, null), Bytes.toBytes(0));
+      Assert.assertArrayEquals(r.getRow(), new byte[] { (char) 0x00 });
+      Assert.assertTrue(--count >= 0);
+    }
+    Assert.assertEquals(count, 0);
+  }
+}