You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2014/03/12 23:49:53 UTC

svn commit: r1576976 - in /hbase/branches/0.98: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-server/src/test/java/org/apache/hadoop/hbase/client/

Author: apurtell
Date: Wed Mar 12 22:49:52 2014
New Revision: 1576976

URL: http://svn.apache.org/r1576976
Log:
HBASE-9999 Add support for small reverse scan (Nicolas Liochon)

Added:
    hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
Modified:
    hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
    hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
    hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
    hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
    hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java

Modified: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java?rev=1576976&r1=1576975&r2=1576976&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java (original)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java Wed Mar 12 22:49:52 2014
@@ -63,7 +63,7 @@ public class ClientScanner extends Abstr
     protected final long maxScannerResultSize;
     private final HConnection connection;
     private final TableName tableName;
-    private final int scannerTimeout;
+    protected final int scannerTimeout;
     protected boolean scanMetricsPublished = false;
     protected RpcRetryingCaller<Result []> caller;
 

Added: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java?rev=1576976&view=auto
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java (added)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java Wed Mar 12 22:49:52 2014
@@ -0,0 +1,188 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+
+/**
+ * Client scanner for small reversed scan. Generally, only one RPC is called to fetch the
+ * scan results, unless the results cross multiple regions or the row count of
+ * results exceed the caching.
+ * <p/>
+ * For small scan, it will get better performance than {@link ReversedClientScanner}
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ClientSmallReversedScanner extends ReversedClientScanner {
+  private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class);
+  private RegionServerCallable<Result[]> smallScanCallable = null;
+  private byte[] skipRowOfFirstResult = null;
+
+  /**
+   * Create a new ReversibleClientScanner for the specified table Note that the
+   * passed {@link org.apache.hadoop.hbase.client.Scan}'s start row maybe changed.
+   *
+   * @param conf       The {@link org.apache.hadoop.conf.Configuration} to use.
+   * @param scan       {@link org.apache.hadoop.hbase.client.Scan} to use in this scanner
+   * @param tableName  The table that we wish to scan
+   * @param connection Connection identifying the cluster
+   * @throws java.io.IOException
+   */
+  public ClientSmallReversedScanner(Configuration conf, Scan scan, TableName tableName,
+                                    HConnection connection) throws IOException {
+    super(conf, scan, tableName, connection);
+  }
+
+  /**
+   * Gets a scanner for following scan. Move to next region or continue from the
+   * last result or start from the start row.
+   *
+   * @param nbRows
+   * @param done              true if Server-side says we're done scanning.
+   * @param currentRegionDone true if scan is over on current region
+   * @return true if has next scanner
+   * @throws IOException
+   */
+  private boolean nextScanner(int nbRows, final boolean done,
+                              boolean currentRegionDone) throws IOException {
+    // Where to start the next getter
+    byte[] localStartKey;
+    int cacheNum = nbRows;
+    skipRowOfFirstResult = null;
+    // if we're at end of table, close and return false to stop iterating
+    if (this.currentRegion != null && currentRegionDone) {
+      byte[] startKey = this.currentRegion.getStartKey();
+      if (startKey == null
+          || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY)
+          || checkScanStopRow(startKey) || done) {
+        close();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Finished with small scan at " + this.currentRegion);
+        }
+        return false;
+      }
+      // We take the row just under to get to the previous region.
+      localStartKey = createClosestRowBefore(startKey);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Finished with region " + this.currentRegion);
+      }
+    } else if (this.lastResult != null) {
+      localStartKey = this.lastResult.getRow();
+      skipRowOfFirstResult = this.lastResult.getRow();
+      cacheNum++;
+    } else {
+      localStartKey = this.scan.getStartRow();
+    }
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Advancing internal small scanner to startKey at '"
+          + Bytes.toStringBinary(localStartKey) + "'");
+    }
+
+    smallScanCallable = ClientSmallScanner.getSmallScanCallable(
+        scan, getConnection(), getTable(), localStartKey, cacheNum);
+
+    if (this.scanMetrics != null && skipRowOfFirstResult == null) {
+      this.scanMetrics.countOfRegions.incrementAndGet();
+    }
+    return true;
+  }
+
+  @Override
+  public Result next() throws IOException {
+    // If the scanner is closed and there's nothing left in the cache, next is a
+    // no-op.
+    if (cache.size() == 0 && this.closed) {
+      return null;
+    }
+    if (cache.size() == 0) {
+      Result[] values = null;
+      long remainingResultSize = maxScannerResultSize;
+      int countdown = this.caching;
+      boolean currentRegionDone = false;
+      // Values == null means server-side filter has determined we must STOP
+      while (remainingResultSize > 0 && countdown > 0
+          && nextScanner(countdown, values == null, currentRegionDone)) {
+        // Server returns a null values if scanning is to stop. Else,
+        // returns an empty array if scanning is to go on and we've just
+        // exhausted current region.
+        values = this.caller.callWithRetries(smallScanCallable, scannerTimeout);
+        this.currentRegion = smallScanCallable.getHRegionInfo();
+        long currentTime = System.currentTimeMillis();
+        if (this.scanMetrics != null) {
+          this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime
+              - lastNext);
+        }
+        lastNext = currentTime;
+        if (values != null && values.length > 0) {
+          for (int i = 0; i < values.length; i++) {
+            Result rs = values[i];
+            if (i == 0 && this.skipRowOfFirstResult != null
+                && Bytes.equals(skipRowOfFirstResult, rs.getRow())) {
+              // Skip the first result
+              continue;
+            }
+            cache.add(rs);
+            for (Cell kv : rs.rawCells()) {
+              remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize();
+            }
+            countdown--;
+            this.lastResult = rs;
+          }
+        }
+        currentRegionDone = countdown > 0;
+      }
+    }
+
+    if (cache.size() > 0) {
+      return cache.poll();
+    }
+    // if we exhausted this scanner before calling close, write out the scan
+    // metrics
+    writeScanMetrics();
+    return null;
+  }
+
+
+  @Override
+  protected void initializeScannerInConstruction() throws IOException {
+    // No need to initialize the scanner when constructing instance, do it when
+    // calling next(). Do nothing here.
+  }
+
+  @Override
+  public void close() {
+    if (!scanMetricsPublished) writeScanMetrics();
+    closed = true;
+  }
+
+}

Modified: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java?rev=1576976&r1=1576975&r2=1576976&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java (original)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java Wed Mar 12 22:49:52 2014
@@ -19,8 +19,6 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -29,7 +27,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
@@ -153,21 +150,23 @@ public class ClientSmallScanner extends 
       LOG.trace("Advancing internal small scanner to startKey at '"
           + Bytes.toStringBinary(localStartKey) + "'");
     }
-    smallScanCallable = getSmallScanCallable(localStartKey, cacheNum);
+    smallScanCallable = getSmallScanCallable(
+        scan, getConnection(), getTable(), localStartKey, cacheNum);
     if (this.scanMetrics != null && skipRowOfFirstResult == null) {
       this.scanMetrics.countOfRegions.incrementAndGet();
     }
     return true;
   }
 
-  private RegionServerCallable<Result[]> getSmallScanCallable(
-      byte[] localStartKey, final int cacheNum) {
-    this.scan.setStartRow(localStartKey);
+  static RegionServerCallable<Result[]> getSmallScanCallable(
+      final Scan sc, HConnection connection, TableName table, byte[] localStartKey,
+      final int cacheNum) throws IOException { 
+    sc.setStartRow(localStartKey);
     RegionServerCallable<Result[]> callable = new RegionServerCallable<Result[]>(
-        getConnection(), getTable(), scan.getStartRow()) {
+        connection, table, sc.getStartRow()) {
       public Result[] call() throws IOException {
         ScanRequest request = RequestConverter.buildScanRequest(getLocation()
-            .getRegionInfo().getRegionName(), scan, cacheNum, true);
+          .getRegionInfo().getRegionName(), sc, cacheNum, true);
         ScanResponse response = null;
         PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
         try {

Modified: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1576976&r1=1576975&r2=1576976&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed Mar 12 22:49:52 2014
@@ -718,15 +718,24 @@ public class HTable implements HTableInt
     if (scan.getCaching() <= 0) {
       scan.setCaching(getScannerCaching());
     }
-    if (scan.isSmall() && !scan.isReversed()) {
+
+    if (scan.isReversed()) {
+      if (scan.isSmall()) {
+        return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
+            this.connection);
+      } else {
+        return new ReversedClientScanner(getConfiguration(), scan, getName(),
+            this.connection);
+      }
+    }
+
+    if (scan.isSmall()) {
       return new ClientSmallScanner(getConfiguration(), scan, getName(),
-          this.connection);
-    } else if (scan.isReversed()) {
-      return new ReversedClientScanner(getConfiguration(), scan, getName(),
-          this.connection);
+          this.connection, this.rpcCallerFactory);
+    } else {
+      return new ClientScanner(getConfiguration(), scan,
+          getName(), this.connection);
     }
-    return new ClientScanner(getConfiguration(), scan,
-        getName(), this.connection);
   }
 
   /**

Modified: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java?rev=1576976&r1=1576975&r2=1576976&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java (original)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java Wed Mar 12 22:49:52 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ExceptionUtil;
 
 /**
  * A reversed client scanner which support backward scanning
@@ -114,6 +115,7 @@ public class ReversedClientScanner exten
         this.scanMetrics.countOfRegions.incrementAndGet();
       }
     } catch (IOException e) {
+      ExceptionUtil.rethrowIfInterrupt(e);
       close();
       throw e;
     }
@@ -151,7 +153,7 @@ public class ReversedClientScanner exten
    * @param row
    * @return a new byte array which is the closest front row of the specified one
    */
-  private byte[] createClosestRowBefore(byte[] row) {
+  protected byte[] createClosestRowBefore(byte[] row) {
     if (row == null) {
       throw new IllegalArgumentException("The passed row is empty");
     }

Modified: hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java?rev=1576976&r1=1576975&r2=1576976&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java (original)
+++ hbase/branches/0.98/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java Wed Mar 12 22:49:52 2014
@@ -70,7 +70,7 @@ public class ReversedScannerCallable ext
         this.location = connection.getRegionLocation(tableName, row, reload);
         if (this.location == null) {
           throw new IOException("Failed to find location, tableName="
-              + tableName + ", row=" + Bytes.toString(row) + ", reload="
+              + tableName + ", row=" + Bytes.toStringBinary(row) + ", reload="
               + reload);
         }
       } else {

Modified: hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1576976&r1=1576975&r2=1576976&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/branches/0.98/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Wed Mar 12 22:49:52 2014
@@ -5835,4 +5835,170 @@ public class TestFromClientSide {
     assertEquals(insertNum, count);
     table.close();
   }
+
+
+  /**
+   * Tests reversed scan under multi regions
+   */
+  @Test
+  public void testSmallReversedScanUnderMultiRegions() throws Exception {
+    // Test Initialization.
+    byte[] TABLE = Bytes.toBytes("testSmallReversedScanUnderMultiRegions");
+    byte[][] splitRows = new byte[][]{
+        Bytes.toBytes("000"), Bytes.toBytes("002"), Bytes.toBytes("004"),
+        Bytes.toBytes("006"), Bytes.toBytes("008"), Bytes.toBytes("010")};
+    HTable table = TEST_UTIL.createTable(TABLE, FAMILY, splitRows);
+    TEST_UTIL.waitUntilAllRegionsAssigned(table.getName());
+
+    assertEquals(splitRows.length + 1, table.getRegionLocations().size());
+    for (byte[] splitRow : splitRows) {
+      Put put = new Put(splitRow);
+      put.add(FAMILY, QUALIFIER, VALUE);
+      table.put(put);
+
+      byte[] nextRow = Bytes.copy(splitRow);
+      nextRow[nextRow.length - 1]++;
+
+      put = new Put(nextRow);
+      put.add(FAMILY, QUALIFIER, VALUE);
+      table.put(put);
+    }
+
+    // scan forward
+    ResultScanner scanner = table.getScanner(new Scan());
+    int count = 0;
+    for (Result r : scanner) {
+      assertTrue(!r.isEmpty());
+      count++;
+    }
+    assertEquals(12, count);
+
+    reverseScanTest(table, false);
+    reverseScanTest(table, true);
+
+    table.close();
+  }
+
+  private void reverseScanTest(HTable table, boolean small) throws IOException {
+    // scan backward
+    Scan scan = new Scan();
+    scan.setReversed(true);
+    ResultScanner scanner = table.getScanner(scan);
+    int count = 0;
+    byte[] lastRow = null;
+    for (Result r : scanner) {
+      assertTrue(!r.isEmpty());
+      count++;
+      byte[] thisRow = r.getRow();
+      if (lastRow != null) {
+        assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
+            + ",this row=" + Bytes.toString(thisRow),
+            Bytes.compareTo(thisRow, lastRow) < 0);
+      }
+      lastRow = thisRow;
+    }
+    assertEquals(12, count);
+
+    scan = new Scan();
+    scan.setSmall(small);
+    scan.setReversed(true);
+    scan.setStartRow(Bytes.toBytes("002"));
+    scanner = table.getScanner(scan);
+    count = 0;
+    lastRow = null;
+    for (Result r : scanner) {
+      assertTrue(!r.isEmpty());
+      count++;
+      byte[] thisRow = r.getRow();
+      if (lastRow != null) {
+        assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
+            + ",this row=" + Bytes.toString(thisRow),
+            Bytes.compareTo(thisRow, lastRow) < 0);
+      }
+      lastRow = thisRow;
+    }
+    assertEquals(3, count); // 000 001 002
+
+    scan = new Scan();
+    scan.setSmall(small);
+    scan.setReversed(true);
+    scan.setStartRow(Bytes.toBytes("002"));
+    scan.setStopRow(Bytes.toBytes("000"));
+    scanner = table.getScanner(scan);
+    count = 0;
+    lastRow = null;
+    for (Result r : scanner) {
+      assertTrue(!r.isEmpty());
+      count++;
+      byte[] thisRow = r.getRow();
+      if (lastRow != null) {
+        assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
+            + ",this row=" + Bytes.toString(thisRow),
+            Bytes.compareTo(thisRow, lastRow) < 0);
+      }
+      lastRow = thisRow;
+    }
+    assertEquals(2, count); // 001 002
+
+    scan = new Scan();
+    scan.setSmall(small);
+    scan.setReversed(true);
+    scan.setStartRow(Bytes.toBytes("001"));
+    scanner = table.getScanner(scan);
+    count = 0;
+    lastRow = null;
+    for (Result r : scanner) {
+      assertTrue(!r.isEmpty());
+      count++;
+      byte[] thisRow = r.getRow();
+      if (lastRow != null) {
+        assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
+            + ",this row=" + Bytes.toString(thisRow),
+            Bytes.compareTo(thisRow, lastRow) < 0);
+      }
+      lastRow = thisRow;
+    }
+    assertEquals(2, count); // 000 001
+
+    scan = new Scan();
+    scan.setSmall(small);
+    scan.setReversed(true);
+    scan.setStartRow(Bytes.toBytes("000"));
+    scanner = table.getScanner(scan);
+    count = 0;
+    lastRow = null;
+    for (Result r : scanner) {
+      assertTrue(!r.isEmpty());
+      count++;
+      byte[] thisRow = r.getRow();
+      if (lastRow != null) {
+        assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
+            + ",this row=" + Bytes.toString(thisRow),
+            Bytes.compareTo(thisRow, lastRow) < 0);
+      }
+      lastRow = thisRow;
+    }
+    assertEquals(1, count); // 000
+
+    scan = new Scan();
+    scan.setSmall(small);
+    scan.setReversed(true);
+    scan.setStartRow(Bytes.toBytes("006"));
+    scan.setStopRow(Bytes.toBytes("002"));
+    scanner = table.getScanner(scan);
+    count = 0;
+    lastRow = null;
+    for (Result r : scanner) {
+      assertTrue(!r.isEmpty());
+      count++;
+      byte[] thisRow = r.getRow();
+      if (lastRow != null) {
+        assertTrue("Error scan order, last row= " + Bytes.toString(lastRow)
+            + ",this row=" + Bytes.toString(thisRow),
+            Bytes.compareTo(thisRow, lastRow) < 0);
+      }
+      lastRow = thisRow;
+    }
+    assertEquals(4, count); // 003 004 005 006
+  }
 }