You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2012/11/16 01:02:45 UTC
svn commit: r1410118 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/
Author: liyin
Date: Fri Nov 16 00:02:44 2012
New Revision: 1410118
URL: http://svn.apache.org/viewvc?rev=1410118&view=rev
Log:
[HBASE-2376] Introduce an API to perform FlashBackQueries.
Author: pritam
Summary:
Updated the relevant sections of scanning to make sure the
FlashBackQuery API is supported completely. I also changed the writables
for Get and Scan. I removed some version checking during write which I
felt was unnecessary (maybe I'm not clear on why we had that, but feel
free to point something that I obviously missed out).
This is on top of D606550
Test Plan: Unit tests added.
Reviewers: kannan, kranganathan, liyintang, aaiyer, adela, cjin
Reviewed By: liyintang
CC: hbase-eng@, erling
Differential Revision: https://phabricator.fb.com/D618553
Task ID: 1733764
Added:
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/FlashBackQueryTestUtil.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQuery.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQueryCompaction.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1410118&r1=1410117&r2=1410118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java Fri Nov 16 00:02:44 2012
@@ -1677,18 +1677,29 @@ public class KeyValue implements Writabl
return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
}
+ public static KeyValue createFirstDeleteFamilyOnRow(final byte[] row,
+ final byte[] family) {
+ return createDeleteFamilyOnRow(row, family,
+ HConstants.LATEST_TIMESTAMP);
+ }
+
/**
* Create a Delete Family KeyValue for the specified row and family that would
* be smaller than all other possible Delete Family KeyValues that have the
- * same row and family.
- * Used for seeking.
- * @param row - row key (arbitrary byte array)
- * @param family - family name
- * @return First Delete Family possible key on passed <code>row</code>.
- */
- public static KeyValue createFirstDeleteFamilyOnRow(final byte [] row,
- final byte [] family) {
- return new KeyValue(row, family, null, HConstants.LATEST_TIMESTAMP,
+ * same row and family. Used for seeking.
+ *
+ * @param row
+ * - row key (arbitrary byte array)
+ * @param family
+ * - family name
+ * @param ts
+ * - timestamp
+ * @return the Delete Family possible key on passed <code>row</code>
+ * and <code>ts</code>.
+ */
+ public static KeyValue createDeleteFamilyOnRow(final byte [] row,
+ final byte[] family, long ts) {
+ return new KeyValue(row, family, null, ts,
Type.DeleteFamily);
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java?rev=1410118&r1=1410117&r2=1410118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java Fri Nov 16 00:02:44 2012
@@ -69,7 +69,10 @@ import java.util.TreeSet;
*/
public class Get extends OperationWithAttributes
implements Writable, Row, Comparable<Row> {
- private static final byte GET_VERSION = (byte)3;
+ private static final byte STORE_LIMIT_VERSION = (byte) 2;
+ private static final byte STORE_OFFSET_VERSION = (byte) 3;
+ private static final byte FLASHBACK_VERSION = (byte) 4;
+ private static final byte GET_VERSION = FLASHBACK_VERSION;
private byte [] row = null;
private long lockId = -1L;
@@ -80,6 +83,8 @@ public class Get extends OperationWithAt
private TimeRange tr = new TimeRange();
private Map<byte [], NavigableSet<byte []>> familyMap =
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
+ // Operation should be performed as if it was performed at the given ts.
+ private long effectiveTS = HConstants.LATEST_TIMESTAMP;
/** Constructor for Writable. DO NOT USE */
public Get() {}
@@ -183,6 +188,16 @@ public class Get extends OperationWithAt
}
/**
+ * Set the effective timestamp for this get.
+ *
+ * @return this for invocation chaining
+ */
+ public Get setEffectiveTS(long effectiveTS) {
+ this.effectiveTS = effectiveTS;
+ return this;
+ }
+
+ /**
* Get up to the specified number of versions of each column.
* @param maxVersions maximum versions for each column
* @throws IOException if invalid number of versions
@@ -270,6 +285,13 @@ public class Get extends OperationWithAt
}
/**
+ * @return the effective timestamp of this operation.
+ */
+ public long getEffectiveTS() {
+ return this.effectiveTS;
+ }
+
+ /**
* Method for retrieving the get's maximum number of values
* to return per Column Family
* @return the maximum number of values to fetch per CF
@@ -412,12 +434,15 @@ public class Get extends OperationWithAt
this.row = Bytes.readByteArray(in);
this.lockId = in.readLong();
this.maxVersions = in.readInt();
- if (version > 1) {
+ if (version >= STORE_LIMIT_VERSION) {
this.storeLimit = in.readInt();
}
- if (version > 2) {
+ if (version >= STORE_OFFSET_VERSION) {
this.storeOffset = in.readInt();
}
+ if (version >= FLASHBACK_VERSION) {
+ effectiveTS = in.readLong();
+ }
boolean hasFilter = in.readBoolean();
if (hasFilter) {
this.filter = (Filter)createForName(Bytes.toString(Bytes.readByteArray(in)));
@@ -446,22 +471,29 @@ public class Get extends OperationWithAt
public void write(final DataOutput out)
throws IOException {
- byte version = (byte)1;
- if (this.storeOffset != 0) {
- version = GET_VERSION;
+ // We try to talk a protocol version as low as possible so that we can be
+ // backward compatible as far as possible.
+ byte version = (byte) 1;
+ if (effectiveTS != HConstants.LATEST_TIMESTAMP) {
+ version = FLASHBACK_VERSION;
+ } else if (this.storeOffset != 0) {
+ version = STORE_OFFSET_VERSION;
} else if (this.storeLimit != -1) {
- version = (byte)2;
+ version = STORE_LIMIT_VERSION;
}
out.writeByte(version);
Bytes.writeByteArray(out, this.row);
out.writeLong(this.lockId);
out.writeInt(this.maxVersions);
- if (version > 1) {
+ if (version >= STORE_LIMIT_VERSION) {
out.writeInt(this.storeLimit);
}
- if (version > 2) {
+ if (version >= STORE_OFFSET_VERSION) {
out.writeInt(this.storeOffset);
}
+ if (version >= FLASHBACK_VERSION) {
+ out.writeLong(effectiveTS);
+ }
if(this.filter == null) {
out.writeBoolean(false);
} else {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1410118&r1=1410117&r2=1410118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java Fri Nov 16 00:02:44 2012
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
@@ -83,11 +84,12 @@ import java.util.TreeSet;
* execute {@link #setCacheBlocks(boolean)}.
*/
public class Scan extends Operation implements Writable {
- private static final int VERSION_STORE_LIMIT = 2;
- private static final int VERSION_STORE_OFFSET = 3;
- private static final int VERSION_RESPONSE_SIZE = 4;
- private static final byte SCAN_VERSION = VERSION_RESPONSE_SIZE;
-
+ private static final byte STORE_LIMIT_VERSION = (byte)2;
+ private static final byte STORE_OFFSET_VERSION = (byte)3;
+ private static final byte RESPONSE_SIZE_VERSION = (byte)4;
+ private static final byte FLASHBACK_VERSION = (byte) 5;
+ private static final byte SCAN_VERSION = FLASHBACK_VERSION;
+
private byte [] startRow = HConstants.EMPTY_START_ROW;
private byte [] stopRow = HConstants.EMPTY_END_ROW;
private int maxVersions = 1;
@@ -103,6 +105,7 @@ public class Scan extends Operation impl
private TimeRange tr = new TimeRange();
private Map<byte [], NavigableSet<byte []>> familyMap =
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
+ private long effectiveTS = HConstants.LATEST_TIMESTAMP;
/**
* Create a Scan operation across all rows.
@@ -180,6 +183,7 @@ public class Scan extends Operation impl
this.storeOffset = get.getRowOffsetPerColumnFamily();
this.tr = get.getTimeRange();
this.familyMap = get.getFamilyMap();
+ this.effectiveTS = get.getEffectiveTS();
}
public boolean isGetScan() {
@@ -282,6 +286,16 @@ public class Scan extends Operation impl
}
/**
+ * Set the effective timestamp of this operation
+ *
+ * @return this
+ */
+ public Scan setEffectiveTS(long effectiveTS) {
+ this.effectiveTS = effectiveTS;
+ return this;
+ }
+
+ /**
* Get all available versions.
* @return this
*/
@@ -465,6 +479,13 @@ public class Scan extends Operation impl
}
/**
+ * @return the effective timestamp for this operation
+ */
+ public long getEffectiveTS() {
+ return this.effectiveTS;
+ }
+
+ /**
* @return maximum number of values to return for a single call to next()
*/
public int getBatch() {
@@ -641,16 +662,19 @@ public class Scan extends Operation impl
this.stopRow = Bytes.readByteArray(in);
this.maxVersions = in.readInt();
this.batch = in.readInt();
- if (version >= VERSION_STORE_LIMIT) {
+ if (version >= STORE_LIMIT_VERSION) {
this.storeLimit = in.readInt();
}
- if (version >= VERSION_STORE_OFFSET) {
+ if (version >= STORE_OFFSET_VERSION) {
this.storeOffset = in.readInt();
}
- if (version >= VERSION_RESPONSE_SIZE) {
+ if (version >= RESPONSE_SIZE_VERSION) {
this.maxResponseSize = in.readInt();
this.partialRow = in.readBoolean();
}
+ if (version >= FLASHBACK_VERSION) {
+ effectiveTS = in.readLong();
+ }
this.caching = in.readInt();
this.cacheBlocks = in.readBoolean();
if(in.readBoolean()) {
@@ -676,13 +700,18 @@ public class Scan extends Operation impl
public void write(final DataOutput out)
throws IOException {
- byte version = (byte)1;
- if (this.maxResponseSize != HConstants.DEFAULT_HBASE_SCANNER_MAX_RESULT_SIZE) {
- version = (byte) VERSION_RESPONSE_SIZE;
+ // We try to talk a protocol version as low as possible so that we can be
+ // backward compatible as far as possible.
+ byte version = (byte) 1;
+ if (effectiveTS != HConstants.LATEST_TIMESTAMP) {
+ version = FLASHBACK_VERSION;
+ } else if (this.maxResponseSize
+ != HConstants.DEFAULT_HBASE_SCANNER_MAX_RESULT_SIZE) {
+ version = (byte) RESPONSE_SIZE_VERSION;
} else if (this.storeOffset != 0) {
- version = (byte)VERSION_STORE_OFFSET;
+ version = STORE_OFFSET_VERSION;
} else if (this.storeLimit != -1) {
- version = (byte)VERSION_STORE_LIMIT;
+ version = STORE_LIMIT_VERSION;
}
out.writeByte(version);
@@ -690,16 +719,19 @@ public class Scan extends Operation impl
Bytes.writeByteArray(out, this.stopRow);
out.writeInt(this.maxVersions);
out.writeInt(this.batch);
- if (version >= VERSION_STORE_LIMIT) {
+ if (version >= STORE_LIMIT_VERSION) {
out.writeInt(this.storeLimit);
}
- if (version >= VERSION_STORE_OFFSET) {
+ if (version >= STORE_OFFSET_VERSION) {
out.writeInt(this.storeOffset);
}
- if (version >= VERSION_RESPONSE_SIZE) {
+ if (version >= RESPONSE_SIZE_VERSION) {
out.writeInt(this.maxResponseSize);
out.writeBoolean(this.partialRow);
}
+ if (version >= FLASHBACK_VERSION) {
+ out.writeLong(effectiveTS);
+ }
out.writeInt(this.caching);
out.writeBoolean(this.cacheBlocks);
if(this.filter == null) {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1410118&r1=1410117&r2=1410118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Fri Nov 16 00:02:44 2012
@@ -77,6 +77,8 @@ public class ScanQueryMatcher {
private final long oldestFlashBackTS;
+ private final long effectiveTS;
+
/**
* Constructs a ScanQueryMatcher for a Scan.
* @param scan
@@ -111,8 +113,9 @@ public class ScanQueryMatcher {
this.rowComparator = rowComparator;
this.deletes = new ScanDeleteTracker();
this.stopRow = scan.getStopRow();
- this.startKey = KeyValue.createFirstDeleteFamilyOnRow(scan.getStartRow(),
- family);
+ this.effectiveTS = scan.getEffectiveTS();
+ this.startKey = KeyValue.createDeleteFamilyOnRow(scan.getStartRow(),
+ family, effectiveTS);
this.filter = scan.getFilter();
this.retainDeletesInOutputUntil = retainDeletesInOutputUntil;
this.maxReadPointToTrackVersions = readPointToUse;
@@ -171,6 +174,9 @@ public class ScanQueryMatcher {
* caused by a data corruption.
*/
public MatchCode match(KeyValue kv) throws IOException {
+ if (kv.getTimestamp() > effectiveTS) {
+ return MatchCode.SEEK_TO_EFFECTIVE_TS;
+ }
if (filter != null && filter.filterAllRemaining()) {
return MatchCode.DONE_SCAN;
}
@@ -389,6 +395,10 @@ public class ScanQueryMatcher {
null, 0, 0);
}
+ public KeyValue getKeyForEffectiveTSOnRow(KeyValue kv) {
+ return kv.createFirstOnRowColTS(effectiveTS);
+ }
+
/**
* {@link #match} return codes. These instruct the scanner moving through
* memstores and StoreFiles what to do with the current KeyValue.
@@ -440,6 +450,8 @@ public class ScanQueryMatcher {
*/
SEEK_NEXT_USING_HINT,
+ SEEK_TO_EFFECTIVE_TS,
+
/**
* Include KeyValue and done with column, seek to next.
*/
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1410118&r1=1410117&r2=1410118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Fri Nov 16 00:02:44 2012
@@ -93,7 +93,9 @@ public class StoreScanner extends NonLaz
this.scan = scan;
this.keyValueAggregator = keyValueAggregator;
this.columns = columns;
- oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl
+ long currentTime = (scan.getEffectiveTS() == HConstants.LATEST_TIMESTAMP) ? EnvironmentEdgeManager
+ .currentTimeMillis() : scan.getEffectiveTS();
+ oldestUnexpiredTS = currentTime - ttl
- flashBackQueryLimit;
// We look up row-column Bloom filters for multi-column queries as part of
@@ -398,6 +400,10 @@ public class StoreScanner extends NonLaz
}
switch(qcode) {
+
+ case SEEK_TO_EFFECTIVE_TS:
+ reseek(matcher.getKeyForEffectiveTSOnRow(kv));
+ break;
case INCLUDE:
case INCLUDE_AND_SEEK_NEXT_ROW:
case INCLUDE_AND_SEEK_NEXT_COL:
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/FlashBackQueryTestUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/FlashBackQueryTestUtil.java?rev=1410118&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/FlashBackQueryTestUtil.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/FlashBackQueryTestUtil.java Fri Nov 16 00:02:44 2012
@@ -0,0 +1,197 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+
+import org.junit.AfterClass;
+import static org.junit.Assert.*;
+import org.junit.BeforeClass;
+
+public class FlashBackQueryTestUtil {
+
+ private static final Log LOG = LogFactory
+ .getLog(FlashBackQueryTestUtil.class);
+ protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ protected static Random random = new Random();
+ protected static final int CURRENT_TIME = 60000;
+ protected static final int MAX_MAXVERSIONS = 100;
+ protected static final byte[][] families = new byte[][] { "a".getBytes(),
+ "b".getBytes(), "c".getBytes(), "d".getBytes(), "e".getBytes(),
+ "f".getBytes(), "g".getBytes(), "h".getBytes(), "i".getBytes(),
+ "j".getBytes() };
+
+ protected static class KVComparator implements Comparator<KeyValue> {
+
+ @Override
+ public int compare(KeyValue kv1, KeyValue kv2) {
+ return (kv1.getTimestamp() < kv2.getTimestamp() ? -1 : (kv1
+ .getTimestamp() > kv2.getTimestamp() ? 1 : 0));
+ }
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ ManualEnvironmentEdge mock = new ManualEnvironmentEdge();
+ mock.setValue(CURRENT_TIME);
+ EnvironmentEdgeManagerTestHelper.injectEdge(mock);
+ TEST_UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ EnvironmentEdgeManagerTestHelper.reset();
+ }
+
+ protected HTable setupTable(int ttl, int flashBackQueryLimit, int maxVersions)
+ throws Exception {
+ LOG.info("maxVersions : " + maxVersions);
+ LOG.info("TTL : " + ttl);
+ LOG.info("FBQ: " + flashBackQueryLimit);
+ byte[] tableName = ("loadTable" + random.nextInt()).getBytes();
+ HColumnDescriptor[] hcds = new HColumnDescriptor[random
+ .nextInt(families.length - 1) + 1];
+ for (int i = 0; i < hcds.length; i++) {
+ hcds[i] = new HColumnDescriptor(families[i]);
+ hcds[i].setTimeToLive(ttl);
+ hcds[i].setFlashBackQueryLimit(flashBackQueryLimit);
+ hcds[i].setMaxVersions(maxVersions);
+ }
+ return TEST_UTIL.createTable(tableName, hcds);
+ }
+
+ protected void flushAllRegions() throws Exception {
+ HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
+ for (HRegion region : server.getOnlineRegions()) {
+ server.flushRegion(region.getRegionName());
+ }
+ }
+
+ protected KeyValue getRandomKv(byte[] row, HColumnDescriptor hcd, int start,
+ int size, boolean isDelete) {
+ byte[] value = new byte[10];
+ random.nextBytes(value);
+ long ts = start + random.nextInt(size);
+ if (isDelete) {
+ Type type = null;
+ if (random.nextBoolean()) {
+ type = Type.DeleteColumn;
+ } else {
+ type = Type.DeleteFamily;
+ }
+ return new KeyValue(row, hcd.getName(), null, ts, type,
+ value);
+ }
+ return new KeyValue(row, hcd.getName(), null, ts, value);
+ }
+
+ protected void majorCompact(byte[] tableName, HColumnDescriptor[] hcds)
+ throws Exception {
+ HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
+ for (HRegion region : server.getOnlineRegions()) {
+ if (!new String(region.getTableDesc().getName()).equals(new String(
+ tableName))) {
+ LOG.info("Skipping since name is : "
+ + new String(region.getTableDesc().getName()));
+ continue;
+ }
+ for (HColumnDescriptor hcd : hcds) {
+ Store store = region.getStore(hcd.getName());
+ store.compactRecentForTesting(-1);
+ }
+ }
+ }
+
+ protected void processHeapKvs(
+ HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvs,
+ byte[] row, HColumnDescriptor hcd, KeyValue kv) {
+ HashMap<String, PriorityQueue<KeyValue>> rowMap = heapKvs.get(new String(
+ row));
+ if (rowMap == null) {
+ rowMap = new HashMap<String, PriorityQueue<KeyValue>>();
+ heapKvs.put(new String(row), rowMap);
+ }
+ PriorityQueue<KeyValue> q = rowMap.get(new String(hcd.getName()));
+ if (kv.isDelete()) {
+ if (q != null) {
+ // Timestamps appear in increasing order.
+ LOG.info("Clearing out at : " + kv);
+ q.clear();
+ }
+ return;
+ }
+ if (q == null) {
+ q = new PriorityQueue<KeyValue>(1, new KVComparator());
+ rowMap.put(new String(hcd.getName()), q);
+ }
+ q.add(kv);
+ LOG.info("Added kv : " + kv);
+ if (q.size() > hcd.getMaxVersions()) {
+ q.poll();
+ }
+
+ }
+
+ protected void setStoreProps(byte[] tableName, HColumnDescriptor[] hcds,
+ boolean def) {
+ HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
+ for (HRegion region : server.getOnlineRegions()) {
+ if (!new String(region.getTableDesc().getName()).equals(new String(
+ tableName))) {
+ LOG.info("Skipping since name is : "
+ + new String(region.getTableDesc().getName()));
+ continue;
+ }
+ for (HColumnDescriptor hcd : hcds) {
+ Store store = region.getStore(hcd.getName());
+ // Reset back to original values.
+ if (def) {
+ store.ttl = hcd.getTimeToLive() * 1000;
+ store.getFamily().setMaxVersions(hcd.getMaxVersions());
+ } else {
+ store.ttl = HConstants.FOREVER;
+ store.getFamily().setMaxVersions(Integer.MAX_VALUE);
+ }
+ }
+ }
+ }
+
+ protected boolean inHeapKvs(KeyValue kv,
+ HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvs) {
+ HashMap<String, PriorityQueue<KeyValue>> rowMap = heapKvs.get(new String(kv
+ .getRow()));
+ if (rowMap == null)
+ return false;
+ return rowMap.get(new String(kv.getFamily())).contains(kv);
+ }
+
+ protected void verifyHeapKvs(
+ HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvs,
+ HashSet<KeyValue> tableSet) {
+ for (HashMap<String, PriorityQueue<KeyValue>> rowMap : heapKvs.values()) {
+ for (PriorityQueue<KeyValue> q : rowMap.values()) {
+ for (KeyValue kv : q) {
+ assertTrue("KV in heapKvs: " + kv + " does not exist in table",
+ tableSet.contains(kv));
+ }
+ }
+ }
+ }
+
+}
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQuery.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQuery.java?rev=1410118&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQuery.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQuery.java Fri Nov 16 00:02:44 2012
@@ -0,0 +1,219 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+public class TestFlashBackQuery extends FlashBackQueryTestUtil {
+ private static Log LOG = LogFactory.getLog(TestFlashBackQuery.class);
+ private static final HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvs = new HashMap<String, HashMap<String, PriorityQueue<KeyValue>>>();
+
+ private HashMap<Long, Integer> getMap(long effectiveTS, HTable table)
+ throws Exception {
+ Scan scan = new Scan();
+ scan.setMaxVersions();
+ scan.setEffectiveTS(effectiveTS);
+ HashMap<Long, Integer> tableMap = new HashMap<Long, Integer>();
+ for (Result result : table.getScanner(scan)) {
+ for (KeyValue kv : result.list()) {
+ Integer val = tableMap.get(kv.getTimestamp());
+ int ival = (val == null) ? 0 : val;
+ tableMap.put(kv.getTimestamp(), ival + 1);
+ LOG.info("Got kv : " + kv + " effective : " + effectiveTS);
+ }
+ }
+ return tableMap;
+ }
+
+ private void verify(HTable table) throws Exception {
+ HashMap<Long, Integer> tableMap = getMap(HConstants.LATEST_TIMESTAMP, table);
+
+ assertEquals(10, (int) tableMap.get(50000L));
+ assertEquals(10, (int) tableMap.get(40000L));
+ verify1(table);
+ }
+
+ private void verify1(HTable table) throws Exception {
+ verify1(table, 0);
+ }
+
+ private void verify1(HTable table, int deleteVersions) throws Exception {
+ int expected3k = 10 - deleteVersions;
+ HashMap<Long, Integer> tableMap = getMap(35000, table);
+
+ assertEquals(expected3k, (int) tableMap.get(30000L));
+ assertEquals(10, (int) tableMap.get(20000L));
+ }
+
+ @Test
+ public void testTTL() throws Exception {
+ HTable table = setupTable(20, 30, 10);
+ byte[] row = new byte[10];
+ for (int i = 0; i < 10; i++) {
+ random.nextBytes(row);
+ Put put = new Put(row);
+ put.add(new KeyValue(row, families[0], families[0], 50000, row));
+ put.add(new KeyValue(row, families[0], families[0], 40000, row));
+ put.add(new KeyValue(row, families[0], families[0], 30000, row));
+ put.add(new KeyValue(row, families[0], families[0], 20000, row));
+ put.add(new KeyValue(row, families[0], families[0], 10000, row));
+ table.put(put);
+
+ Get get = new Get(row);
+ get.addColumn(families[0], families[0]);
+ get.setEffectiveTS(35000);
+ List<KeyValue> kvs = table.get(get).list();
+ assertEquals(1, kvs.size());
+ assertEquals(kvs.get(0), new KeyValue(row, families[0], families[0],
+ 30000, row));
+ }
+
+ verify(table);
+ flushAllRegions();
+ verify(table);
+ majorCompact(table.getTableName(), table.getTableDescriptor()
+ .getColumnFamilies());
+ verify(table);
+ }
+
+ @Test
+ public void testMaxVersions() throws Exception {
+ HTable table = setupTable(60, 60, 2);
+ byte[] row = new byte[10];
+ for (int i = 0; i < 10; i++) {
+ random.nextBytes(row);
+ Put put = new Put(row);
+ put.add(new KeyValue(row, families[0], families[0], 50000, row));
+ put.add(new KeyValue(row, families[0], families[0], 40000, row));
+ put.add(new KeyValue(row, families[0], families[0], 30000, row));
+ put.add(new KeyValue(row, families[0], families[0], 20000, row));
+ put.add(new KeyValue(row, families[0], families[0], 10000, row));
+ table.put(put);
+
+ Get get = new Get(row);
+ get.addColumn(families[0], families[0]);
+ get.setEffectiveTS(35000);
+ List<KeyValue> kvs = table.get(get).list();
+ assertEquals(1, kvs.size());
+ assertEquals(kvs.get(0), new KeyValue(row, families[0], families[0],
+ 30000, row));
+ }
+
+ verify(table);
+ flushAllRegions();
+ verify(table);
+ majorCompact(table.getTableName(), table.getTableDescriptor()
+ .getColumnFamilies());
+ verify(table);
+ }
+
+ @Test
+ public void testDeletes() throws Exception {
+ HTable table = setupTable(20, 30, 2);
+ byte[] row = new byte[10];
+ int deleteVersions = 0;
+ for (int i = 0; i < 10; i++) {
+ random.nextBytes(row);
+ Put put = new Put(row);
+ put.add(new KeyValue(row, families[0], null, 50000, row));
+ put.add(new KeyValue(row, families[0], null, 40000, row));
+ put.add(new KeyValue(row, families[0], null, 30000, row));
+ put.add(new KeyValue(row, families[0], null, 20000, row));
+ put.add(new KeyValue(row, families[0], null, 10000, row));
+ table.put(put);
+ Delete delete = new Delete(row);
+ int n = random.nextInt(4);
+ if (n == 0) {
+ delete.deleteFamily(families[0], 40000);
+ } else if (n == 1) {
+ delete.deleteRow(40000);
+ } else if (n == 2) {
+ delete.deleteColumns(families[0], null, 40000);
+ } else {
+ deleteVersions++;
+ delete.deleteColumn(families[0], null, 30000);
+ }
+ table.delete(delete);
+ }
+ verify1(table, deleteVersions);
+ flushAllRegions();
+ verify1(table, deleteVersions);
+ majorCompact(table.getTableName(), table.getTableDescriptor()
+ .getColumnFamilies());
+ verify1(table, deleteVersions);
+ }
+
+ private KeyValue processKV(byte[] row, HColumnDescriptor hcd, int start,
+ int size, long effectiveTS) {
+ boolean isDelete = random.nextBoolean();
+ KeyValue kv = getRandomKv(row, hcd, start, size, isDelete);
+ if (kv.getTimestamp() <= effectiveTS) {
+ processHeapKvs(heapKvs, row, hcd, kv);
+ }
+ return kv;
+ }
+
+ private void verifyRandom(long effectiveTS, HTable table) throws Exception {
+ Scan scan = new Scan();
+ scan.setMaxVersions(Integer.MAX_VALUE);
+ scan.setEffectiveTS(effectiveTS);
+ HashSet<KeyValue> tableSet = new HashSet<KeyValue>();
+ for (Result result : table.getScanner(scan)) {
+ for (KeyValue kv : result.list()) {
+ assertTrue("KV : " + kv + " should not exist", inHeapKvs(kv, heapKvs));
+ tableSet.add(kv);
+ }
+ }
+
+ verifyHeapKvs(heapKvs, tableSet);
+ }
+
+ @Test
+ public void testRandom() throws Exception {
+ int ttl = random.nextInt(CURRENT_TIME/1000);
+ int flashBackQueryLimit = random.nextInt(CURRENT_TIME/1000);
+ int maxVersions = random.nextInt(MAX_MAXVERSIONS);
+ long effectiveTS = CURRENT_TIME
+ - random.nextInt(flashBackQueryLimit * 1000 + 1);
+ LOG.info("PARAMS : " + ttl + " : " + flashBackQueryLimit + " : "
+ + maxVersions + " : " + effectiveTS);
+ HTable table = setupTable(ttl, flashBackQueryLimit, maxVersions);
+ HColumnDescriptor[] hcds = table.getTableDescriptor().getColumnFamilies();
+ byte[] row = new byte[10];
+ for (int i = 0; i < 10; i++) {
+ random.nextBytes(row);
+ Put put = new Put(row);
+ int size = CURRENT_TIME / MAX_MAXVERSIONS;
+ for (HColumnDescriptor hcd : hcds) {
+ for (int versions = 0, start = 0; versions < MAX_MAXVERSIONS; versions++, start += size) {
+ put.add(this.processKV(row, hcd, start, size, effectiveTS));
+ }
+ }
+ table.put(put);
+ }
+ verifyRandom(effectiveTS, table);
+ flushAllRegions();
+ verifyRandom(effectiveTS, table);
+ majorCompact(table.getTableName(), table.getTableDescriptor()
+ .getColumnFamilies());
+ verifyRandom(effectiveTS, table);
+ }
+}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQueryCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQueryCompaction.java?rev=1410118&r1=1410117&r2=1410118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQueryCompaction.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQueryCompaction.java Fri Nov 16 00:02:44 2012
@@ -19,17 +19,15 @@
*/
package org.apache.hadoop.hbase.regionserver;
-import java.util.Comparator;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.PriorityQueue;
-import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.client.Delete;
@@ -37,87 +35,23 @@ import org.apache.hadoop.hbase.client.HT
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.InjectionEvent;
import org.apache.hadoop.hbase.util.InjectionHandler;
-import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
-import org.junit.AfterClass;
import static org.junit.Assert.*;
-import org.junit.BeforeClass;
import org.junit.Test;
-public class TestFlashBackQueryCompaction {
+public class TestFlashBackQueryCompaction extends FlashBackQueryTestUtil {
- private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static Random random = new Random();
- private static final int CURRENT_TIME = 60000;
- private static final int MAX_MAXVERSIONS = 100;
private static final HashSet<KeyValue> goodKvs = new HashSet<KeyValue>();
private static final HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvs = new HashMap<String, HashMap<String, PriorityQueue<KeyValue>>>();
private static final HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvsNormalScan = new HashMap<String, HashMap<String, PriorityQueue<KeyValue>>>();
private static final Log LOG = LogFactory
.getLog(TestFlashBackQueryCompaction.class);
- private static final byte[][] families = new byte[][] { "a".getBytes(),
- "b".getBytes(), "c".getBytes(), "d".getBytes(), "e".getBytes(),
- "f".getBytes(), "g".getBytes(), "h".getBytes(), "i".getBytes(),
- "j".getBytes() };
-
- private static class KVComparator implements Comparator<KeyValue> {
-
- @Override
- public int compare(KeyValue kv1, KeyValue kv2) {
- return (kv1.getTimestamp() < kv2.getTimestamp() ? -1 : (kv1
- .getTimestamp() > kv2.getTimestamp() ? 1 : 0));
- }
- }
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
-
- ManualEnvironmentEdge mock = new ManualEnvironmentEdge();
- mock.setValue(CURRENT_TIME);
- EnvironmentEdgeManagerTestHelper.injectEdge(mock);
- TEST_UTIL.startMiniCluster(1);
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
- EnvironmentEdgeManagerTestHelper.reset();
- }
-
- private void processHeapKvs(
- HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvs,
- byte[] row, HColumnDescriptor hcd, KeyValue kv) {
- HashMap<String, PriorityQueue<KeyValue>> rowMap = heapKvs.get(new String(
- row));
- if (rowMap == null) {
- rowMap = new HashMap<String, PriorityQueue<KeyValue>>();
- heapKvs.put(new String(row), rowMap);
- }
- PriorityQueue<KeyValue> q = rowMap.get(new String(hcd.getName()));
- if (q == null) {
- q = new PriorityQueue<KeyValue>(1, new KVComparator());
- rowMap.put(new String(hcd.getName()), q);
- }
- q.add(kv);
- if (q.size() > hcd.getMaxVersions()) {
- q.poll();
- }
- }
-
- private KeyValue getRandomKv(byte[] row, HColumnDescriptor hcd, int start,
- int size) {
- byte[] value = new byte[10];
- random.nextBytes(value);
- long ts = start + random.nextInt(size);
- return new KeyValue(row, hcd.getName(), null, ts, value);
- }
private KeyValue processKV(byte[] row, HColumnDescriptor hcd, int start,
int size) {
- KeyValue kv = getRandomKv(row, hcd, start, size);
+ KeyValue kv = getRandomKv(row, hcd, start, size, false);
if (kv.getTimestamp() >= CURRENT_TIME - hcd.getTimeToLive() * 1000) {
processHeapKvs(heapKvsNormalScan, row, hcd, kv);
}
@@ -133,30 +67,6 @@ public class TestFlashBackQueryCompactio
return kv;
}
- private HTable setupTable(int ttl, int flashBackQueryLimit, int maxVersions)
- throws Exception {
- LOG.info("maxVersions : " + maxVersions);
- LOG.info("TTL : " + ttl);
- LOG.info("FBQ: " + flashBackQueryLimit);
- byte[] tableName = ("loadTable" + random.nextInt()).getBytes();
- HColumnDescriptor[] hcds = new HColumnDescriptor[random
- .nextInt(families.length - 1) + 1];
- for (int i = 0; i < hcds.length; i++) {
- hcds[i] = new HColumnDescriptor(families[i]);
- hcds[i].setTimeToLive(ttl);
- hcds[i].setFlashBackQueryLimit(flashBackQueryLimit);
- hcds[i].setMaxVersions(maxVersions);
- }
- return TEST_UTIL.createTable(tableName, hcds);
- }
-
- private void flushAllRegions() throws Exception {
- HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
- for (HRegion region : server.getOnlineRegions()) {
- server.flushRegion(region.getRegionName());
- }
- }
-
private void loadTable(int ttl, int flashBackQueryLimit, int maxVersions)
throws Exception {
HTable table = setupTable(ttl, flashBackQueryLimit, maxVersions);
@@ -219,69 +129,6 @@ public class TestFlashBackQueryCompactio
verifyHeapKvs(heapKvsNormalScan, tableSet);
}
- private void majorCompact(byte[] tableName, HColumnDescriptor[] hcds)
- throws Exception {
- HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
- for (HRegion region : server.getOnlineRegions()) {
- if (!new String(region.getTableDesc().getName()).equals(new String(
- tableName))) {
- LOG.info("Skipping since name is : "
- + new String(region.getTableDesc().getName()));
- continue;
- }
- for (HColumnDescriptor hcd : hcds) {
- Store store = region.getStore(hcd.getName());
- store.compactRecentForTesting(-1);
- }
- }
- }
-
- private void setStoreProps(byte[] tableName, HColumnDescriptor[] hcds,
- boolean def) {
- HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
- for (HRegion region : server.getOnlineRegions()) {
- if (!new String(region.getTableDesc().getName()).equals(new String(
- tableName))) {
- LOG.info("Skipping since name is : "
- + new String(region.getTableDesc().getName()));
- continue;
- }
- for (HColumnDescriptor hcd : hcds) {
- Store store = region.getStore(hcd.getName());
- // Reset back to original values.
- if (def) {
- store.ttl = hcd.getTimeToLive() * 1000;
- store.getFamily().setMaxVersions(hcd.getMaxVersions());
- } else {
- store.ttl = HConstants.FOREVER;
- store.getFamily().setMaxVersions(Integer.MAX_VALUE);
- }
- }
- }
- }
-
- private void verifyHeapKvs(
- HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvs,
- HashSet<KeyValue> tableSet) {
- for (HashMap<String, PriorityQueue<KeyValue>> rowMap : heapKvs.values()) {
- for (PriorityQueue<KeyValue> q : rowMap.values()) {
- for (KeyValue kv : q) {
- assertTrue("KV in heapKvs: " + kv + " does not exist in table",
- tableSet.contains(kv));
- }
- }
- }
- }
-
- private boolean inHeapKvs(KeyValue kv,
- HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvs) {
- HashMap<String, PriorityQueue<KeyValue>> rowMap = heapKvs.get(new String(kv
- .getRow()));
- if (rowMap == null)
- return false;
- return rowMap.get(new String(kv.getFamily())).contains(kv);
- }
-
@Test
public void testRandom() throws Exception {
loadTable(random.nextInt(CURRENT_TIME) / 1000,