You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2010/07/09 23:36:41 UTC
svn commit: r962700 - in /hbase/trunk: ./
src/main/java/org/apache/hadoop/hbase/filter/
src/main/java/org/apache/hadoop/hbase/regionserver/
src/test/java/org/apache/hadoop/hbase/client/
Author: rawson
Date: Fri Jul 9 21:36:40 2010
New Revision: 962700
URL: http://svn.apache.org/viewvc?rev=962700&view=rev
Log:
HBASE-2793 Add ability to extract a specified list of versions of a column in a single roundtrip
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/TimestampsFilter.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/Filter.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=962700&r1=962699&r2=962700&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Jul 9 21:36:40 2010
@@ -754,6 +754,8 @@ Release 0.21.0 - Unreleased
(Nicolas Spiegelberg via JD)
HBASE-2786 TestHLog.testSplit hangs (Nicolas Spiegelberg via JD)
HBASE-2790 Purge apache-forrest from TRUNK
+ HBASE-2793 Add ability to extract a specified list of versions of a column
+ in a single roundtrip (Kannan via Ryan)
NEW FEATURES
HBASE-1961 HBase EC2 scripts
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/Filter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/Filter.java?rev=962700&r1=962699&r2=962700&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/Filter.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/Filter.java Fri Jul 9 21:36:40 2010
@@ -102,6 +102,10 @@ public interface Filter extends Writable
*/
SKIP,
/**
+ * Skip this column. Go to the next column in this row.
+ */
+ NEXT_COL,
+ /**
* Done with columns, skip to next row. Note that filterRow() will
* still be called.
*/
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/TimestampsFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/TimestampsFilter.java?rev=962700&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/TimestampsFilter.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/filter/TimestampsFilter.java Fri Jul 9 21:36:40 2010
@@ -0,0 +1,91 @@
+package org.apache.hadoop.hbase.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * Filter that returns only cells whose timestamp (version) is
+ * in the specified list of timestamps (versions).
+ * <p>
+ * Note: Use of this filter overrides any time range/time stamp
+ * options specified using {@link Get#setTimeRange(long, long)},
+ * {@link Scan#setTimeRange(long, long)}, {@link Get#setTimeStamp(long)},
+ * or {@link Scan#setTimeStamp(long)}.
+ */
+public class TimestampsFilter extends FilterBase {
+
+ TreeSet<Long> timestamps;
+
+ // Used during scans to hint the scan to stop early
+ // once the timestamps fall below the minTimeStamp.
+ long minTimeStamp = Long.MAX_VALUE;
+
+ /**
+ * Used during deserialization. Do not use otherwise.
+ */
+ public TimestampsFilter() {
+ super();
+ }
+
+ /**
+ * Constructor for filter that retains only those
+ * cells whose timestamp (version) is in the specified
+ * list of timestamps.
+ *
+ * @param timestamps
+ */
+ public TimestampsFilter(List<Long> timestamps) {
+ this.timestamps = new TreeSet<Long>(timestamps);
+ init();
+ }
+
+ private void init() {
+ if (this.timestamps.size() > 0) {
+ minTimeStamp = this.timestamps.first();
+ }
+ }
+
+ /**
+ * Gets the minimum timestamp requested by filter.
+ * @return minimum timestamp requested by filter.
+ */
+ public long getMin() {
+ return minTimeStamp;
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(KeyValue v) {
+ if (this.timestamps.contains(v.getTimestamp())) {
+ return ReturnCode.INCLUDE;
+ } else if (v.getTimestamp() < minTimeStamp) {
+ // The remaining versions of this column are guaranteed
+ // to be lesser than all of the other values.
+ return ReturnCode.NEXT_COL;
+ }
+ return ReturnCode.SKIP;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int numTimestamps = in.readInt();
+ this.timestamps = new TreeSet<Long>();
+ for (int idx = 0; idx < numTimestamps; idx++) {
+ this.timestamps.add(in.readLong());
+ }
+ init();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ int numTimestamps = this.timestamps.size();
+ out.writeInt(numTimestamps);
+ for (Long timestamp : this.timestamps) {
+ out.writeLong(timestamp);
+ }
+ }
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=962700&r1=962699&r2=962700&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Fri Jul 9 21:36:40 2010
@@ -173,8 +173,10 @@ public class ScanQueryMatcher extends Qu
if (filterResponse == ReturnCode.SKIP)
return MatchCode.SKIP;
-
+ else if (filterResponse == ReturnCode.NEXT_COL)
+ return MatchCode.SEEK_NEXT_COL;
// else if (filterResponse == ReturnCode.NEXT_ROW)
+
stickyNextRow = true;
return MatchCode.SEEK_NEXT_ROW;
}
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java?rev=962700&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestTimestampsFilter.java Fri Jul 9 21:36:40 2010
@@ -0,0 +1,342 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.TimestampsFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Run tests related to {@link TimestampsFilter} using HBase client APIs.
+ * Sets up the HBase mini cluster once at start. Each creates a table
+ * named for the method and does its stuff against that.
+ */
+public class TestTimestampsFilter {
+ final Log LOG = LogFactory.getLog(getClass());
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster(3);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ // Nothing to do.
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @After
+ public void tearDown() throws Exception {
+ // Nothing to do.
+ }
+
+ /**
+ * Test from client side for TimestampsFilter.
+ *
+ * The TimestampsFilter provides the ability to request cells (KeyValues)
+ * whose timestamp/version is in the specified list of timestamps/version.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testTimestampsFilter() throws Exception {
+ byte [] TABLE = Bytes.toBytes("testTimestampsFilter");
+ byte [] FAMILY = Bytes.toBytes("event_log");
+ byte [][] FAMILIES = new byte[][] { FAMILY };
+ KeyValue kvs[];
+
+ // create table; set versions to max...
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+ for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+ for (int colIdx = 0; colIdx < 5; colIdx++) {
+ // insert versions 201..300
+ putNVersions(ht, FAMILY, rowIdx, colIdx, 201, 300);
+ // insert versions 1..100
+ putNVersions(ht, FAMILY, rowIdx, colIdx, 1, 100);
+ }
+ }
+
+ // do some verification before flush
+ verifyInsertedValues(ht, FAMILY);
+
+ flush();
+
+ // do some verification after flush
+ verifyInsertedValues(ht, FAMILY);
+
+ // Insert some more versions after flush. These should be in memstore.
+ // After this we should have data in both memstore & HFiles.
+ for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+ for (int colIdx = 0; colIdx < 5; colIdx++) {
+ putNVersions(ht, FAMILY, rowIdx, colIdx, 301, 400);
+ putNVersions(ht, FAMILY, rowIdx, colIdx, 101, 200);
+ }
+ }
+
+ for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+ for (int colIdx = 0; colIdx < 5; colIdx++) {
+ kvs = getNVersions(ht, FAMILY, rowIdx, colIdx,
+ Arrays.asList(505L, 5L, 105L, 305L, 205L));
+ assertEquals(4, kvs.length);
+ checkOneCell(kvs[0], FAMILY, rowIdx, colIdx, 305);
+ checkOneCell(kvs[1], FAMILY, rowIdx, colIdx, 205);
+ checkOneCell(kvs[2], FAMILY, rowIdx, colIdx, 105);
+ checkOneCell(kvs[3], FAMILY, rowIdx, colIdx, 5);
+ }
+ }
+
+ // Request an empty list of versions using the Timestamps filter;
+ // Should return none.
+ kvs = getNVersions(ht, FAMILY, 2, 2, new ArrayList<Long>());
+ assertEquals(0, kvs.length);
+
+ //
+ // Test the filter using a Scan operation
+ // Scan rows 0..4. For each row, get all its columns, but only
+ // those versions of the columns with the specified timestamps.
+ Result[] results = scanNVersions(ht, FAMILY, 0, 4,
+ Arrays.asList(6L, 106L, 306L));
+ assertEquals("# of rows returned from scan", 5, results.length);
+ for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+ kvs = results[rowIdx].raw();
+ // each row should have 5 columns.
+ // And we have requested 3 versions for each.
+ assertEquals("Number of KeyValues in result for row:" + rowIdx,
+ 3*5, kvs.length);
+ for (int colIdx = 0; colIdx < 5; colIdx++) {
+ int offset = colIdx * 3;
+ checkOneCell(kvs[offset + 0], FAMILY, rowIdx, colIdx, 306);
+ checkOneCell(kvs[offset + 1], FAMILY, rowIdx, colIdx, 106);
+ checkOneCell(kvs[offset + 2], FAMILY, rowIdx, colIdx, 6);
+ }
+ }
+ }
+
+ /**
+ * Test TimestampsFilter in the presence of version deletes.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testWithVersionDeletes() throws Exception {
+
+ // first test from memstore (without flushing).
+ testWithVersionDeletes(false);
+
+ // run same test against HFiles (by forcing a flush).
+ testWithVersionDeletes(true);
+ }
+
+ private void testWithVersionDeletes(boolean flushTables) throws IOException {
+ byte [] TABLE = Bytes.toBytes("testWithVersionDeletes_" +
+ (flushTables ? "flush" : "noflush"));
+ byte [] FAMILY = Bytes.toBytes("event_log");
+ byte [][] FAMILIES = new byte[][] { FAMILY };
+
+ // create table; set versions to max...
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+ // For row:0, col:0: insert versions 1 through 5.
+ putNVersions(ht, FAMILY, 0, 0, 1, 5);
+
+ // delete version 4.
+ deleteOneVersion(ht, FAMILY, 0, 0, 4);
+
+ if (flushTables) {
+ flush();
+ }
+
+ // request a bunch of versions including the deleted version. We should
+ // only get back entries for the versions that exist.
+ KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
+ assertEquals(3, kvs.length);
+ checkOneCell(kvs[0], FAMILY, 0, 0, 5);
+ checkOneCell(kvs[1], FAMILY, 0, 0, 3);
+ checkOneCell(kvs[2], FAMILY, 0, 0, 2);
+ }
+
+ private void verifyInsertedValues(HTable ht, byte[] cf) throws IOException {
+ for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
+ for (int colIdx = 0; colIdx < 5; colIdx++) {
+ // ask for versions that exist.
+ KeyValue[] kvs = getNVersions(ht, cf, rowIdx, colIdx,
+ Arrays.asList(5L, 300L, 6L, 80L));
+ assertEquals(4, kvs.length);
+ checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
+ checkOneCell(kvs[1], cf, rowIdx, colIdx, 80);
+ checkOneCell(kvs[2], cf, rowIdx, colIdx, 6);
+ checkOneCell(kvs[3], cf, rowIdx, colIdx, 5);
+
+ // ask for versions that do not exist.
+ kvs = getNVersions(ht, cf, rowIdx, colIdx,
+ Arrays.asList(101L, 102L));
+ assertEquals(0, kvs.length);
+
+ // ask for some versions that exist and some that do not.
+ kvs = getNVersions(ht, cf, rowIdx, colIdx,
+ Arrays.asList(1L, 300L, 105L, 70L, 115L));
+ assertEquals(3, kvs.length);
+ checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
+ checkOneCell(kvs[1], cf, rowIdx, colIdx, 70);
+ checkOneCell(kvs[2], cf, rowIdx, colIdx, 1);
+ }
+ }
+ }
+
+ // Flush tables. Since flushing is asynchronous, sleep for a bit.
+ private void flush() throws IOException {
+ TEST_UTIL.flush();
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException i) {
+ // ignore
+ }
+ }
+
+ /**
+ * Assert that the passed in KeyValue has expected contents for the
+ * specified row, column & timestamp.
+ */
+ private void checkOneCell(KeyValue kv, byte[] cf,
+ int rowIdx, int colIdx, long ts) {
+
+ String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
+
+ assertEquals("Row mismatch which checking: " + ctx,
+ "row:"+ rowIdx, Bytes.toString(kv.getRow()));
+
+ assertEquals("ColumnFamily mismatch while checking: " + ctx,
+ Bytes.toString(cf), Bytes.toString(kv.getFamily()));
+
+ assertEquals("Column qualifier mismatch while checking: " + ctx,
+ "column:" + colIdx,
+ Bytes.toString(kv.getQualifier()));
+
+ assertEquals("Timestamp mismatch while checking: " + ctx,
+ ts, kv.getTimestamp());
+
+ assertEquals("Value mismatch while checking: " + ctx,
+ "value-version-" + ts, Bytes.toString(kv.getValue()));
+ }
+
+ /**
+ * Uses the TimestampFilter on a Get to request a specified list of
+ * versions for the row/column specified by rowIdx & colIdx.
+ *
+ */
+ private KeyValue[] getNVersions(HTable ht, byte[] cf, int rowIdx,
+ int colIdx, List<Long> versions)
+ throws IOException {
+ byte row[] = Bytes.toBytes("row:" + rowIdx);
+ byte column[] = Bytes.toBytes("column:" + colIdx);
+ Filter filter = new TimestampsFilter(versions);
+ Get get = new Get(row);
+ get.addColumn(cf, column);
+ get.setFilter(filter);
+ get.setMaxVersions();
+ Result result = ht.get(get);
+
+ return result.raw();
+ }
+
+ /**
+ * Uses the TimestampFilter on a Scan to request a specified list of
+ * versions for the rows from startRowIdx to endRowIdx (both inclusive).
+ */
+ private Result[] scanNVersions(HTable ht, byte[] cf, int startRowIdx,
+ int endRowIdx, List<Long> versions)
+ throws IOException {
+ byte startRow[] = Bytes.toBytes("row:" + startRowIdx);
+ byte endRow[] = Bytes.toBytes("row:" + endRowIdx + 1); // exclusive
+ Filter filter = new TimestampsFilter(versions);
+ Scan scan = new Scan(startRow, endRow);
+ scan.setFilter(filter);
+ scan.setMaxVersions();
+ ResultScanner scanner = ht.getScanner(scan);
+ return scanner.next(endRowIdx - startRowIdx + 1);
+ }
+
+ /**
+ * Insert in specific row/column versions with timestamps
+ * versionStart..versionEnd.
+ */
+ private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx,
+ long versionStart, long versionEnd)
+ throws IOException {
+ byte row[] = Bytes.toBytes("row:" + rowIdx);
+ byte column[] = Bytes.toBytes("column:" + colIdx);
+ Put put = new Put(row);
+
+ for (long idx = versionStart; idx <= versionEnd; idx++) {
+ put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
+ }
+
+ ht.put(put);
+ }
+
+ /**
+ * For row/column specified by rowIdx/colIdx, delete the cell
+ * corresponding to the specified version.
+ */
+ private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx,
+ int colIdx, long version)
+ throws IOException {
+ byte row[] = Bytes.toBytes("row:" + rowIdx);
+ byte column[] = Bytes.toBytes("column:" + colIdx);
+ Delete del = new Delete(row);
+ del.deleteColumn(cf, column, version);
+ ht.delete(del);
+ }
+}
+