You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jg...@apache.org on 2010/07/15 19:15:14 UTC
svn commit: r964496 - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/io/
src/main/java/org/apache/hadoop/hbase/regionserver/
src/test/java/org/apache/hadoop/hbase/client/
Author: jgray
Date: Thu Jul 15 17:15:14 2010
New Revision: 964496
URL: http://svn.apache.org/viewvc?rev=964496&view=rev
Log:
HBASE-2517 During reads when passed the specified time range, seek to next column (Pranav via jgray)
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=964496&r1=964495&r2=964496&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Thu Jul 15 17:15:14 2010
@@ -766,6 +766,8 @@ Release 0.21.0 - Unreleased
(Pranav via Ryan)
HBASE-2836 Speed mvn site building by removing generation of useless reports
HBASE-2808 Document the implementation of replication
+ HBASE-2517 During reads when passed the specified time range, seek to
+ next column (Pranav via jgray)
NEW FEATURES
HBASE-1961 HBase EC2 scripts
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java?rev=964496&r1=964495&r2=964496&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java Thu Jul 15 17:15:14 2010
@@ -147,6 +147,23 @@ public class TimeRange implements Writab
return (timestamp >= minStamp);
}
+ /**
+ * Compare the timestamp to timerange
+ * @param timestamp
+ * @return -1 if timestamp is less than timerange,
+ * 0 if timestamp is within timerange,
+ * 1 if timestamp is greater than timerange
+ */
+ public int compare(long timestamp) {
+ if (timestamp < minStamp) {
+ return -1;
+ } else if (timestamp >= maxStamp) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java?rev=964496&r1=964495&r2=964496&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java Thu Jul 15 17:15:14 2010
@@ -163,4 +163,39 @@ public class ExplicitColumnTracker imple
col.setCount(this.maxVersions);
}
}
+
+ /**
+ * This method is used to inform the column tracker that we are done with
+ * this column. We may get this information from external filters or
+ * timestamp range and we then need to indicate this information to
+ * tracker. It is required only in case of ExplicitColumnTracker.
+ * @param bytes
+ * @param offset
+ * @param length
+ */
+ public void doneWithColumn(byte [] bytes, int offset, int length) {
+ while (this.column != null) {
+ int compare = Bytes.compareTo(column.getBuffer(), column.getOffset(),
+ column.getLength(), bytes, offset, length);
+ if (compare == 0) {
+ this.columns.remove(this.index);
+ if (this.columns.size() == this.index) {
+ // Will not hit any more columns in this storefile
+ this.column = null;
+ } else {
+ this.column = this.columns.get(this.index);
+ }
+ return;
+ } else if ( compare <= -1) {
+ if(++this.index != this.columns.size()) {
+ this.column = this.columns.get(this.index);
+ } else {
+ this.column = null;
+ }
+ } else {
+ return;
+ }
+ }
+ }
+
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=964496&r1=964495&r2=964496&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Thu Jul 15 17:15:14 2010
@@ -111,9 +111,16 @@ public class KeyValueHeap implements Key
return false;
}
InternalScanner currentAsInternal = (InternalScanner)this.current;
- currentAsInternal.next(result, limit);
+ boolean mayContainsMoreRows = currentAsInternal.next(result, limit);
KeyValue pee = this.current.peek();
- if (pee == null) {
+ /*
+ * By definition, any InternalScanner must return false only when it has no
+ * further rows to be fetched. So, we can close a scanner if it returns
+ * false. All existing implementations seem to be fine with this. It is much
+ * more efficient to close scanners which are not needed than keep them in
+ * the heap. This is also required for certain optimizations.
+ */
+ if (pee == null || !mayContainsMoreRows) {
this.current.close();
} else {
this.heap.add(this.current);
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=964496&r1=964495&r2=964496&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Thu Jul 15 17:15:14 2010
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
@@ -34,6 +35,7 @@ public class ScanQueryMatcher extends Qu
// Optimization so we can skip lots of compares when we decide to skip
// to the next row.
private boolean stickyNextRow;
+ private byte[] stopRow;
/**
* Constructs a QueryMatcher for a Scan.
@@ -50,6 +52,7 @@ public class ScanQueryMatcher extends Qu
this.oldestStamp = System.currentTimeMillis() - ttl;
this.rowComparator = rowComparator;
this.deletes = new ScanDeleteTracker();
+ this.stopRow = scan.getStopRow();
this.startKey = KeyValue.createFirstOnRow(scan.getStartRow());
this.filter = scan.getFilter();
@@ -140,17 +143,37 @@ public class ScanQueryMatcher extends Qu
return MatchCode.SKIP;
}
- if (!tr.withinTimeRange(timestamp)) {
+ if (!this.deletes.isEmpty() &&
+ deletes.isDeleted(bytes, offset, qualLength, timestamp)) {
return MatchCode.SKIP;
}
- if (!this.deletes.isEmpty() &&
- deletes.isDeleted(bytes, offset, qualLength, timestamp)) {
+ int timestampComparison = tr.compare(timestamp);
+ if (timestampComparison >= 1) {
return MatchCode.SKIP;
+ } else if (timestampComparison <= -1) {
+ return getNextRowOrNextColumn(bytes, offset, qualLength);
}
- MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength);
+ /**
+ * Filters should be checked before checking column trackers. If we do
+ * otherwise, as was previously being done, ColumnTracker may increment its
+ * counter for even that KV which may be discarded later on by Filter. This
+ * would lead to incorrect results in certain cases.
+ */
+ if (filter != null) {
+ ReturnCode filterResponse = filter.filterKeyValue(kv);
+ if (filterResponse == ReturnCode.SKIP) {
+ return MatchCode.SKIP;
+ } else if (filterResponse == ReturnCode.NEXT_COL) {
+ return getNextRowOrNextColumn(bytes, offset, qualLength);
+ } else if (filterResponse == ReturnCode.NEXT_ROW) {
+ stickyNextRow = true;
+ return MatchCode.SEEK_NEXT_ROW;
+ }
+ }
+ MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength);
// if SKIP -> SEEK_NEXT_COL
// if (NEXT,DONE) -> SEEK_NEXT_ROW
// if (INCLUDE) -> INCLUDE
@@ -161,24 +184,35 @@ public class ScanQueryMatcher extends Qu
return MatchCode.SEEK_NEXT_ROW;
}
- // else INCLUDE
- // if (colChecker == MatchCode.INCLUDE)
- // give the filter a chance to run.
- if (filter == null)
- return MatchCode.INCLUDE;
-
- ReturnCode filterResponse = filter.filterKeyValue(kv);
- if (filterResponse == ReturnCode.INCLUDE)
- return MatchCode.INCLUDE;
+ return MatchCode.INCLUDE;
- if (filterResponse == ReturnCode.SKIP)
- return MatchCode.SKIP;
- else if (filterResponse == ReturnCode.NEXT_COL)
+ }
+
+ public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
+ int qualLength) {
+ if (columns instanceof ExplicitColumnTracker) {
+ //We only come here when we know that columns is an instance of
+ //ExplicitColumnTracker so we should never have a cast exception
+ ((ExplicitColumnTracker)columns).doneWithColumn(bytes, offset,
+ qualLength);
+ if (columns.getColumnHint() == null) {
+ return MatchCode.SEEK_NEXT_ROW;
+ } else {
+ return MatchCode.SEEK_NEXT_COL;
+ }
+ } else {
return MatchCode.SEEK_NEXT_COL;
- // else if (filterResponse == ReturnCode.NEXT_ROW)
+ }
+ }
- stickyNextRow = true;
- return MatchCode.SEEK_NEXT_ROW;
+ public boolean moreRowsMayExistAfter(KeyValue kv) {
+ if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) &&
+ rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(),
+ kv.getRowLength(), stopRow, 0, stopRow.length) >= 0) {
+ return false;
+ } else {
+ return true;
+ }
}
/**
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=964496&r1=964495&r2=964496&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Thu Jul 15 17:15:14 2010
@@ -256,6 +256,10 @@ class StoreScanner implements KeyValueSc
return false;
case SEEK_NEXT_ROW:
+ if (!matcher.moreRowsMayExistAfter(kv)) {
+ outResult.addAll(results);
+ return false;
+ }
heap.next();
break;
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java?rev=964496&r1=964495&r2=964496&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java Thu Jul 15 17:15:14 2010
@@ -79,6 +79,180 @@ public class TestMultipleTimestamps {
}
@Test
+ public void testReseeksWithOneColumnMiltipleTimestamp() throws IOException {
+ byte [] TABLE = Bytes.toBytes("testReseeksWithOne" +
+ "ColumnMiltipleTimestamps");
+ byte [] FAMILY = Bytes.toBytes("event_log");
+ byte [][] FAMILIES = new byte[][] { FAMILY };
+
+ // create table; set versions to max...
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+ Integer[] putRows = new Integer[] {1, 3, 5, 7};
+ Integer[] putColumns = new Integer[] { 1, 3, 5};
+ Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
+
+ Integer[] scanRows = new Integer[] {3, 5};
+ Integer[] scanColumns = new Integer[] {3};
+ Long[] scanTimestamps = new Long[] {3L, 4L};
+ int scanMaxVersions = 2;
+
+ put(ht, FAMILY, putRows, putColumns, putTimestamps);
+
+ flush();
+
+ ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
+ scanTimestamps, scanMaxVersions);
+
+ KeyValue[] kvs;
+
+ kvs = scanner.next().raw();
+ assertEquals(2, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 3, 3, 4);
+ checkOneCell(kvs[1], FAMILY, 3, 3, 3);
+ kvs = scanner.next().raw();
+ assertEquals(2, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 5, 3, 4);
+ checkOneCell(kvs[1], FAMILY, 5, 3, 3);
+ }
+
+ @Test
+ public void testReseeksWithMultipleColumnOneTimestamp() throws IOException {
+ byte [] TABLE = Bytes.toBytes("testReseeksWithOne" +
+ "ColumnMiltipleTimestamps");
+ byte [] FAMILY = Bytes.toBytes("event_log");
+ byte [][] FAMILIES = new byte[][] { FAMILY };
+
+ // create table; set versions to max...
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+ Integer[] putRows = new Integer[] {1, 3, 5, 7};
+ Integer[] putColumns = new Integer[] { 1, 3, 5};
+ Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
+
+ Integer[] scanRows = new Integer[] {3, 5};
+ Integer[] scanColumns = new Integer[] {3,4};
+ Long[] scanTimestamps = new Long[] {3L};
+ int scanMaxVersions = 2;
+
+ put(ht, FAMILY, putRows, putColumns, putTimestamps);
+
+ flush();
+
+ ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
+ scanTimestamps, scanMaxVersions);
+
+ KeyValue[] kvs;
+
+ kvs = scanner.next().raw();
+ assertEquals(1, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 3, 3, 3);
+ kvs = scanner.next().raw();
+ assertEquals(1, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 5, 3, 3);
+ }
+
+ @Test
+ public void testReseeksWithMultipleColumnMultipleTimestamp() throws
+ IOException {
+ byte [] TABLE = Bytes.toBytes("testReseeksWithOne" +
+ "ColumnMiltipleTimestamps");
+ byte [] FAMILY = Bytes.toBytes("event_log");
+ byte [][] FAMILIES = new byte[][] { FAMILY };
+
+ // create table; set versions to max...
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+ Integer[] putRows = new Integer[] {1, 3, 5, 7};
+ Integer[] putColumns = new Integer[] { 1, 3, 5};
+ Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
+
+ Integer[] scanRows = new Integer[] {5, 7};
+ Integer[] scanColumns = new Integer[] {3, 4, 5};
+ Long[] scanTimestamps = new Long[] {2l, 3L};
+ int scanMaxVersions = 2;
+
+ put(ht, FAMILY, putRows, putColumns, putTimestamps);
+
+ flush();
+
+ ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
+ scanTimestamps, scanMaxVersions);
+
+ KeyValue[] kvs;
+
+ kvs = scanner.next().raw();
+ assertEquals(4, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 5, 3, 3);
+ checkOneCell(kvs[1], FAMILY, 5, 3, 2);
+ checkOneCell(kvs[2], FAMILY, 5, 5, 3);
+ checkOneCell(kvs[3], FAMILY, 5, 5, 2);
+ kvs = scanner.next().raw();
+ assertEquals(4, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 7, 3, 3);
+ checkOneCell(kvs[1], FAMILY, 7, 3, 2);
+ checkOneCell(kvs[2], FAMILY, 7, 5, 3);
+ checkOneCell(kvs[3], FAMILY, 7, 5, 2);
+ }
+
+ @Test
+ public void testReseeksWithMultipleFiles() throws IOException {
+ byte [] TABLE = Bytes.toBytes("testReseeksWithOne" +
+ "ColumnMiltipleTimestamps");
+ byte [] FAMILY = Bytes.toBytes("event_log");
+ byte [][] FAMILIES = new byte[][] { FAMILY };
+
+ // create table; set versions to max...
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+ Integer[] putRows1 = new Integer[] {1, 2, 3};
+ Integer[] putColumns1 = new Integer[] { 2, 5, 6};
+ Long[] putTimestamps1 = new Long[] {1L, 2L, 5L};
+
+ Integer[] putRows2 = new Integer[] {6, 7};
+ Integer[] putColumns2 = new Integer[] {3, 6};
+ Long[] putTimestamps2 = new Long[] {4L, 5L};
+
+ Integer[] putRows3 = new Integer[] {2, 3, 5};
+ Integer[] putColumns3 = new Integer[] {1, 2, 3};
+ Long[] putTimestamps3 = new Long[] {4L,8L};
+
+
+ Integer[] scanRows = new Integer[] {3, 5, 7};
+ Integer[] scanColumns = new Integer[] {3, 4, 5};
+ Long[] scanTimestamps = new Long[] {2l, 4L};
+ int scanMaxVersions = 5;
+
+ put(ht, FAMILY, putRows1, putColumns1, putTimestamps1);
+ flush();
+ put(ht, FAMILY, putRows2, putColumns2, putTimestamps2);
+ flush();
+ put(ht, FAMILY, putRows3, putColumns3, putTimestamps3);
+
+ ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
+ scanTimestamps, scanMaxVersions);
+
+ KeyValue[] kvs;
+
+ kvs = scanner.next().raw();
+ assertEquals(2, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 3, 3, 4);
+ checkOneCell(kvs[1], FAMILY, 3, 5, 2);
+
+ kvs = scanner.next().raw();
+ assertEquals(1, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 5, 3, 4);
+
+ kvs = scanner.next().raw();
+ assertEquals(1, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 6, 3, 4);
+
+ kvs = scanner.next().raw();
+ assertEquals(1, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 7, 3, 4);
+ }
+
+ @Test
public void testWithVersionDeletes() throws Exception {
// first test from memstore (without flushing).
@@ -109,7 +283,8 @@ public class TestMultipleTimestamps {
// request a bunch of versions including the deleted version. We should
// only get back entries for the versions that exist.
- KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
+ KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0,
+ Arrays.asList(2L, 3L, 4L, 5L));
assertEquals(3, kvs.length);
checkOneCell(kvs[0], FAMILY, 0, 0, 5);
checkOneCell(kvs[1], FAMILY, 0, 0, 3);
@@ -240,6 +415,44 @@ public class TestMultipleTimestamps {
return result.raw();
}
+ private ResultScanner scan(HTable ht, byte[] cf,
+ Integer[] rowIndexes, Integer[] columnIndexes,
+ Long[] versions, int maxVersions)
+ throws IOException {
+ Arrays.asList(rowIndexes);
+ byte startRow[] = Bytes.toBytes("row:" +
+ Collections.min( Arrays.asList(rowIndexes)));
+ byte endRow[] = Bytes.toBytes("row:" +
+ Collections.max( Arrays.asList(rowIndexes))+1);
+ Scan scan = new Scan(startRow, endRow);
+ for (Integer colIdx: columnIndexes) {
+ byte column[] = Bytes.toBytes("column:" + colIdx);
+ scan.addColumn(cf, column);
+ }
+ scan.setMaxVersions(maxVersions);
+ scan.setTimeRange(Collections.min(Arrays.asList(versions)),
+ Collections.max(Arrays.asList(versions))+1);
+ ResultScanner scanner = ht.getScanner(scan);
+ return scanner;
+ }
+
+ private void put(HTable ht, byte[] cf, Integer[] rowIndexes,
+ Integer[] columnIndexes, Long[] versions)
+ throws IOException {
+ for (int rowIdx: rowIndexes) {
+ byte row[] = Bytes.toBytes("row:" + rowIdx);
+ Put put = new Put(row);
+ for(int colIdx: columnIndexes) {
+ byte column[] = Bytes.toBytes("column:" + colIdx);
+ for (long version: versions) {
+ put.add(cf, column, version, Bytes.toBytes("value-version-" +
+ version));
+ }
+ }
+ ht.put(put);
+ }
+ }
+
/**
* Insert in specific row/column versions with timestamps
* versionStart..versionEnd.