You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jg...@apache.org on 2010/07/15 19:15:14 UTC

svn commit: r964496 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/io/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/test/java/org/apache/hadoop/hbase/client/

Author: jgray
Date: Thu Jul 15 17:15:14 2010
New Revision: 964496

URL: http://svn.apache.org/viewvc?rev=964496&view=rev
Log:
HBASE-2517  During reads when passed the specified time range, seek to next column (Pranav via jgray)

Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=964496&r1=964495&r2=964496&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Thu Jul 15 17:15:14 2010
@@ -766,6 +766,8 @@ Release 0.21.0 - Unreleased
                (Pranav via Ryan)
    HBASE-2836  Speed mvn site building by removing generation of useless reports
    HBASE-2808  Document the implementation of replication
+   HBASE-2517  During reads when passed the specified time range, seek to
+               next column (Pranav via jgray)
 
   NEW FEATURES
    HBASE-1961  HBase EC2 scripts

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java?rev=964496&r1=964495&r2=964496&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java Thu Jul 15 17:15:14 2010
@@ -147,6 +147,23 @@ public class TimeRange implements Writab
     return (timestamp >= minStamp);
   }
 
+  /**
+   * Compare the timestamp to timerange
+   * @param timestamp
+   * @return -1 if timestamp is less than timerange,
+   * 0 if timestamp is within timerange,
+   * 1 if timestamp is greater than timerange
+   */
+  public int compare(long timestamp) {
+    if (timestamp < minStamp) {
+      return -1;
+    } else if (timestamp >= maxStamp) {
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java?rev=964496&r1=964495&r2=964496&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java Thu Jul 15 17:15:14 2010
@@ -163,4 +163,39 @@ public class ExplicitColumnTracker imple
       col.setCount(this.maxVersions);
     }
   }
+
+  /**
+   * This method is used to inform the column tracker that we are done with
+   * this column. We may get this information from external filters or
+   * timestamp range and we then need to indicate this information to
+   * tracker. It is required only in case of ExplicitColumnTracker.
+   * @param bytes
+   * @param offset
+   * @param length
+   */
+  public void doneWithColumn(byte [] bytes, int offset, int length) {
+    while (this.column != null) {
+      int compare = Bytes.compareTo(column.getBuffer(), column.getOffset(),
+          column.getLength(), bytes, offset, length);
+      if (compare == 0) {
+        this.columns.remove(this.index);
+        if (this.columns.size() == this.index) {
+          // Will not hit any more columns in this storefile
+          this.column = null;
+        } else {
+          this.column = this.columns.get(this.index);
+        }
+        return;
+      } else if ( compare <= -1) {
+        if(++this.index != this.columns.size()) {
+          this.column = this.columns.get(this.index);
+        } else {
+          this.column = null;
+        }
+      } else {
+        return;
+      }
+    }
+  }
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=964496&r1=964495&r2=964496&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Thu Jul 15 17:15:14 2010
@@ -111,9 +111,16 @@ public class KeyValueHeap implements Key
       return false;
     }
     InternalScanner currentAsInternal = (InternalScanner)this.current;
-    currentAsInternal.next(result, limit);
+    boolean mayContainsMoreRows = currentAsInternal.next(result, limit);
     KeyValue pee = this.current.peek();
-    if (pee == null) {
+    /*
+     * By definition, any InternalScanner must return false only when it has no
+     * further rows to be fetched. So, we can close a scanner if it returns
+     * false. All existing implementations seem to be fine with this. It is much
+     * more efficient to close scanners which are not needed than keep them in
+     * the heap. This is also required for certain optimizations.
+     */
+    if (pee == null || !mayContainsMoreRows) {
       this.current.close();
     } else {
       this.heap.add(this.current);

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=964496&r1=964495&r2=964496&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Thu Jul 15 17:15:14 2010
@@ -20,6 +20,7 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
@@ -34,6 +35,7 @@ public class ScanQueryMatcher extends Qu
   // Optimization so we can skip lots of compares when we decide to skip
   // to the next row.
   private boolean stickyNextRow;
+  private byte[] stopRow;
 
   /**
    * Constructs a QueryMatcher for a Scan.
@@ -50,6 +52,7 @@ public class ScanQueryMatcher extends Qu
     this.oldestStamp = System.currentTimeMillis() - ttl;
     this.rowComparator = rowComparator;
     this.deletes =  new ScanDeleteTracker();
+    this.stopRow = scan.getStopRow();
     this.startKey = KeyValue.createFirstOnRow(scan.getStartRow());
     this.filter = scan.getFilter();
 
@@ -140,17 +143,37 @@ public class ScanQueryMatcher extends Qu
       return MatchCode.SKIP;
     }
 
-    if (!tr.withinTimeRange(timestamp)) {
+    if (!this.deletes.isEmpty() &&
+        deletes.isDeleted(bytes, offset, qualLength, timestamp)) {
       return MatchCode.SKIP;
     }
 
-    if (!this.deletes.isEmpty() &&
-        deletes.isDeleted(bytes, offset, qualLength, timestamp)) {
+    int timestampComparison = tr.compare(timestamp);
+    if (timestampComparison >= 1) {
       return MatchCode.SKIP;
+    } else if (timestampComparison <= -1) {
+      return getNextRowOrNextColumn(bytes, offset, qualLength);
     }
 
-    MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength);
+    /**
+     * Filters should be checked before checking column trackers. If we do
+     * otherwise, as was previously being done, ColumnTracker may increment its
+     * counter for even that KV which may be discarded later on by Filter. This
+     * would lead to incorrect results in certain cases.
+     */
+    if (filter != null) {
+      ReturnCode filterResponse = filter.filterKeyValue(kv);
+      if (filterResponse == ReturnCode.SKIP) {
+        return MatchCode.SKIP;
+      } else if (filterResponse == ReturnCode.NEXT_COL) {
+        return getNextRowOrNextColumn(bytes, offset, qualLength);
+      } else if (filterResponse == ReturnCode.NEXT_ROW) {
+        stickyNextRow = true;
+        return MatchCode.SEEK_NEXT_ROW;
+      }
+    }
 
+    MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength);
     // if SKIP -> SEEK_NEXT_COL
     // if (NEXT,DONE) -> SEEK_NEXT_ROW
     // if (INCLUDE) -> INCLUDE
@@ -161,24 +184,35 @@ public class ScanQueryMatcher extends Qu
       return MatchCode.SEEK_NEXT_ROW;
     }
 
-    // else INCLUDE
-    // if (colChecker == MatchCode.INCLUDE)
-    // give the filter a chance to run.
-    if (filter == null)
-      return MatchCode.INCLUDE;
-
-    ReturnCode filterResponse = filter.filterKeyValue(kv);
-    if (filterResponse == ReturnCode.INCLUDE)
-      return MatchCode.INCLUDE;
+    return MatchCode.INCLUDE;
 
-    if (filterResponse == ReturnCode.SKIP)
-      return MatchCode.SKIP;
-    else if (filterResponse == ReturnCode.NEXT_COL)
+  }
+
+  public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
+      int qualLength) {
+    if (columns instanceof ExplicitColumnTracker) {
+      //We only come here when we know that columns is an instance of
+      //ExplicitColumnTracker so we should never have a cast exception
+      ((ExplicitColumnTracker)columns).doneWithColumn(bytes, offset,
+          qualLength);
+      if (columns.getColumnHint() == null) {
+        return MatchCode.SEEK_NEXT_ROW;
+      } else {
+        return MatchCode.SEEK_NEXT_COL;
+      }
+    } else {
       return MatchCode.SEEK_NEXT_COL;
-    // else if (filterResponse == ReturnCode.NEXT_ROW)
+    }
+  }
 
-    stickyNextRow = true;
-    return MatchCode.SEEK_NEXT_ROW;
+  public boolean moreRowsMayExistAfter(KeyValue kv) {
+    if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) &&
+        rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(),
+            kv.getRowLength(), stopRow, 0, stopRow.length) >= 0) {
+      return false;
+    } else {
+      return true;
+    }
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=964496&r1=964495&r2=964496&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Thu Jul 15 17:15:14 2010
@@ -256,6 +256,10 @@ class StoreScanner implements KeyValueSc
           return false;
 
         case SEEK_NEXT_ROW:
+          if (!matcher.moreRowsMayExistAfter(kv)) {
+            outResult.addAll(results);
+            return false;
+          }
           heap.next();
           break;
 

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java?rev=964496&r1=964495&r2=964496&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java Thu Jul 15 17:15:14 2010
@@ -79,6 +79,180 @@ public class TestMultipleTimestamps {
   }
 
   @Test
+  public void testReseeksWithOneColumnMiltipleTimestamp() throws IOException {
+    byte [] TABLE = Bytes.toBytes("testReseeksWithOne" +
+    "ColumnMiltipleTimestamps");
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    Integer[] putRows = new Integer[] {1, 3, 5, 7};
+    Integer[] putColumns = new Integer[] { 1, 3, 5};
+    Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
+
+    Integer[] scanRows = new Integer[] {3, 5};
+    Integer[] scanColumns = new Integer[] {3};
+    Long[] scanTimestamps = new Long[] {3L, 4L};
+    int scanMaxVersions = 2;
+
+    put(ht, FAMILY, putRows, putColumns, putTimestamps);
+
+    flush();
+
+    ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
+        scanTimestamps, scanMaxVersions);
+
+    KeyValue[] kvs;
+
+    kvs = scanner.next().raw();
+    assertEquals(2, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 3, 3, 4);
+    checkOneCell(kvs[1], FAMILY, 3, 3, 3);
+    kvs = scanner.next().raw();
+    assertEquals(2, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 5, 3, 4);
+    checkOneCell(kvs[1], FAMILY, 5, 3, 3);
+  }
+
+  @Test
+  public void testReseeksWithMultipleColumnOneTimestamp() throws IOException {
+    byte [] TABLE = Bytes.toBytes("testReseeksWithOne" +
+    "ColumnMiltipleTimestamps");
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    Integer[] putRows = new Integer[] {1, 3, 5, 7};
+    Integer[] putColumns = new Integer[] { 1, 3, 5};
+    Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
+
+    Integer[] scanRows = new Integer[] {3, 5};
+    Integer[] scanColumns = new Integer[] {3,4};
+    Long[] scanTimestamps = new Long[] {3L};
+    int scanMaxVersions = 2;
+
+    put(ht, FAMILY, putRows, putColumns, putTimestamps);
+
+    flush();
+
+    ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
+        scanTimestamps, scanMaxVersions);
+
+    KeyValue[] kvs;
+
+    kvs = scanner.next().raw();
+    assertEquals(1, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 3, 3, 3);
+    kvs = scanner.next().raw();
+    assertEquals(1, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 5, 3, 3);
+  }
+
+  @Test
+  public void testReseeksWithMultipleColumnMultipleTimestamp() throws
+  IOException {
+    byte [] TABLE = Bytes.toBytes("testReseeksWithOne" +
+    "ColumnMiltipleTimestamps");
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    Integer[] putRows = new Integer[] {1, 3, 5, 7};
+    Integer[] putColumns = new Integer[] { 1, 3, 5};
+    Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
+
+    Integer[] scanRows = new Integer[] {5, 7};
+    Integer[] scanColumns = new Integer[] {3, 4, 5};
+    Long[] scanTimestamps = new Long[] {2l, 3L};
+    int scanMaxVersions = 2;
+
+    put(ht, FAMILY, putRows, putColumns, putTimestamps);
+
+    flush();
+
+    ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
+        scanTimestamps, scanMaxVersions);
+
+    KeyValue[] kvs;
+
+    kvs = scanner.next().raw();
+    assertEquals(4, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 5, 3, 3);
+    checkOneCell(kvs[1], FAMILY, 5, 3, 2);
+    checkOneCell(kvs[2], FAMILY, 5, 5, 3);
+    checkOneCell(kvs[3], FAMILY, 5, 5, 2);
+    kvs = scanner.next().raw();
+    assertEquals(4, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 7, 3, 3);
+    checkOneCell(kvs[1], FAMILY, 7, 3, 2);
+    checkOneCell(kvs[2], FAMILY, 7, 5, 3);
+    checkOneCell(kvs[3], FAMILY, 7, 5, 2);
+  }
+
+  @Test
+  public void testReseeksWithMultipleFiles() throws IOException {
+    byte [] TABLE = Bytes.toBytes("testReseeksWithOne" +
+    "ColumnMiltipleTimestamps");
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    Integer[] putRows1 = new Integer[] {1, 2, 3};
+    Integer[] putColumns1 = new Integer[] { 2, 5, 6};
+    Long[] putTimestamps1 = new Long[] {1L, 2L, 5L};
+
+    Integer[] putRows2 = new Integer[] {6, 7};
+    Integer[] putColumns2 = new Integer[] {3, 6};
+    Long[] putTimestamps2 = new Long[] {4L, 5L};
+
+    Integer[] putRows3 = new Integer[] {2, 3, 5};
+    Integer[] putColumns3 = new Integer[] {1, 2, 3};
+    Long[] putTimestamps3 = new Long[] {4L,8L};
+
+
+    Integer[] scanRows = new Integer[] {3, 5, 7};
+    Integer[] scanColumns = new Integer[] {3, 4, 5};
+    Long[] scanTimestamps = new Long[] {2l, 4L};
+    int scanMaxVersions = 5;
+
+    put(ht, FAMILY, putRows1, putColumns1, putTimestamps1);
+    flush();
+    put(ht, FAMILY, putRows2, putColumns2, putTimestamps2);
+    flush();
+    put(ht, FAMILY, putRows3, putColumns3, putTimestamps3);
+
+    ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
+        scanTimestamps, scanMaxVersions);
+
+    KeyValue[] kvs;
+
+    kvs = scanner.next().raw();
+    assertEquals(2, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 3, 3, 4);
+    checkOneCell(kvs[1], FAMILY, 3, 5, 2);
+
+    kvs = scanner.next().raw();
+    assertEquals(1, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 5, 3, 4);
+
+    kvs = scanner.next().raw();
+    assertEquals(1, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 6, 3, 4);
+
+    kvs = scanner.next().raw();
+    assertEquals(1, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 7, 3, 4);
+  }
+
+  @Test
   public void testWithVersionDeletes() throws Exception {
 
     // first test from memstore (without flushing).
@@ -109,7 +283,8 @@ public class TestMultipleTimestamps {
 
     // request a bunch of versions including the deleted version. We should
     // only get back entries for the versions that exist.
-    KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
+    KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0,
+        Arrays.asList(2L, 3L, 4L, 5L));
     assertEquals(3, kvs.length);
     checkOneCell(kvs[0], FAMILY, 0, 0, 5);
     checkOneCell(kvs[1], FAMILY, 0, 0, 3);
@@ -240,6 +415,44 @@ public class TestMultipleTimestamps {
     return result.raw();
   }
 
+  private  ResultScanner scan(HTable ht, byte[] cf,
+      Integer[] rowIndexes, Integer[] columnIndexes,
+      Long[] versions, int maxVersions)
+  throws IOException {
+    Arrays.asList(rowIndexes);
+    byte startRow[] = Bytes.toBytes("row:" +
+        Collections.min( Arrays.asList(rowIndexes)));
+    byte endRow[] = Bytes.toBytes("row:" +
+        Collections.max( Arrays.asList(rowIndexes))+1);
+    Scan scan = new Scan(startRow, endRow);
+    for (Integer colIdx: columnIndexes) {
+      byte column[] = Bytes.toBytes("column:" + colIdx);
+      scan.addColumn(cf, column);
+    }
+    scan.setMaxVersions(maxVersions);
+    scan.setTimeRange(Collections.min(Arrays.asList(versions)),
+        Collections.max(Arrays.asList(versions))+1);
+    ResultScanner scanner = ht.getScanner(scan);
+    return scanner;
+  }
+
+  private void put(HTable ht, byte[] cf, Integer[] rowIndexes,
+      Integer[] columnIndexes, Long[] versions)
+  throws IOException {
+    for (int rowIdx: rowIndexes) {
+      byte row[] = Bytes.toBytes("row:" + rowIdx);
+      Put put = new Put(row);
+      for(int colIdx: columnIndexes) {
+        byte column[] = Bytes.toBytes("column:" + colIdx);
+        for (long version: versions) {
+          put.add(cf, column, version, Bytes.toBytes("value-version-" +
+              version));
+        }
+      }
+      ht.put(put);
+    }
+  }
+
   /**
    * Insert in specific row/column versions with timestamps
    * versionStart..versionEnd.