You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2007/09/05 18:00:02 UTC
svn commit: r572980 - in /lucene/hadoop/trunk/src/contrib/hbase: ./
src/java/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/
Author: stack
Date: Wed Sep 5 09:00:01 2007
New Revision: 572980
URL: http://svn.apache.org/viewvc?rev=572980&view=rev
Log:
HADOOP-1834 Scanners ignore timestamp passed on creation
M src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
(addContents): Added overrides that allow specifying a timestamp.
M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java
Made it so test inherits from HBaseTestCase instead of from HBaseClusterTestCase
so could add in tests that do not use cluster.
(testTimestampScanning): Added test for hadoop-1834 bug.
(testTimestamp): Refactored to remove duplicated code.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
(getNext): Make it respect the timestamp set on construction.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
Removed eclipse yellow flag warnings around empty parens and
auto-boxing longs.
(getNext): Make it respect the timestamp set on construction.
Modified:
lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=572980&r1=572979&r2=572980&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Wed Sep 5 09:00:01 2007
@@ -30,6 +30,7 @@
HADOOP-1814 TestCleanRegionServerExit fails too often on Hudson
HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8")
HADOOP-1832 listTables() returns duplicate tables
+ HADOOP-1834 Scanners ignore timestamp passed on creation
IMPROVEMENTS
HADOOP-1737 Make HColumnDescriptor data publically members settable
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?rev=572980&r1=572979&r2=572980&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Wed Sep 5 09:00:01 2007
@@ -58,7 +58,9 @@
/**
* Constructor
*/
- public HMemcache() {}
+ public HMemcache() {
+ super();
+ }
/** represents the state of the memcache at a specified point in time */
static class Snapshot {
@@ -68,7 +70,7 @@
Snapshot(final TreeMap<HStoreKey, byte[]> memcache, final Long i) {
super();
this.memcacheSnapshot = memcache;
- this.sequenceId = i;
+ this.sequenceId = i.longValue();
}
}
@@ -95,7 +97,8 @@
if(memcache.size() == 0) {
return null;
}
- Snapshot retval = new Snapshot(memcache, log.startCacheFlush());
+ Snapshot retval =
+ new Snapshot(memcache, Long.valueOf(log.startCacheFlush()));
this.snapshot = memcache;
history.add(memcache);
memcache = new TreeMap<HStoreKey, byte []>();
@@ -294,18 +297,16 @@
final Iterator<HStoreKey> keyIterators[];
@SuppressWarnings("unchecked")
- HMemcacheScanner(long timestamp, Text targetCols[], Text firstRow)
- throws IOException {
-
+ HMemcacheScanner(final long timestamp, final Text targetCols[],
+ final Text firstRow)
+ throws IOException {
super(timestamp, targetCols);
-
lock.obtainReadLock();
try {
this.backingMaps = new TreeMap[history.size() + 1];
//NOTE: Since we iterate through the backing maps from 0 to n, we need
- // to put the memcache first, the newest history second, ..., etc.
-
+ // to put the memcache first, the newest history second, ..., etc.
backingMaps[0] = memcache;
for(int i = history.size() - 1; i > 0; i--) {
backingMaps[i] = history.elementAt(i);
@@ -364,13 +365,25 @@
*/
@Override
boolean getNext(int i) {
- if (!keyIterators[i].hasNext()) {
- closeSubScanner(i);
- return false;
- }
- this.keys[i] = keyIterators[i].next();
- this.vals[i] = backingMaps[i].get(keys[i]);
- return true;
+ boolean result = false;
+ while (true) {
+ if (!keyIterators[i].hasNext()) {
+ closeSubScanner(i);
+ break;
+ }
+ // Check key is < than passed timestamp for this scanner.
+ HStoreKey hsk = keyIterators[i].next();
+ if (hsk == null) {
+ throw new NullPointerException("Unexpected null key");
+ }
+ if (hsk.getTimestamp() <= this.timestamp) {
+ this.keys[i] = hsk;
+ this.vals[i] = backingMaps[i].get(keys[i]);
+ result = true;
+ break;
+ }
+ }
+ return result;
}
/** Shut down an individual map iterator. */
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?rev=572980&r1=572979&r2=572980&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Wed Sep 5 09:00:01 2007
@@ -528,7 +528,7 @@
*/
public boolean needsCompaction() {
return this.storefiles != null &&
- this.storefiles.size() >= this.compactionThreshold;
+ this.storefiles.size() >= this.compactionThreshold;
}
/**
@@ -1334,13 +1334,20 @@
*/
@Override
boolean getNext(int i) throws IOException {
+ boolean result = false;
ImmutableBytesWritable ibw = new ImmutableBytesWritable();
- if (!readers[i].next(keys[i], ibw)) {
- closeSubScanner(i);
- return false;
+ while (true) {
+ if (!readers[i].next(keys[i], ibw)) {
+ closeSubScanner(i);
+ break;
+ }
+ if (keys[i].getTimestamp() <= this.timestamp) {
+ vals[i] = ibw.get();
+ result = true;
+ break;
+ }
}
- vals[i] = ibw.get();
- return true;
+ return result;
}
/** Close down the indicated reader. */
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java?rev=572980&r1=572979&r2=572980&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Wed Sep 5 09:00:01 2007
@@ -39,6 +39,8 @@
protected FileSystem localFs = null;
protected static final char FIRST_CHAR = 'a';
protected static final char LAST_CHAR = 'z';
+ protected static final byte [] START_KEY_BYTES =
+ {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
static {
StaticTestEnvironment.initialize();
@@ -117,9 +119,9 @@
Text endKey = r.getRegionInfo().getEndKey();
byte [] startKeyBytes = startKey.getBytes();
if (startKeyBytes == null || startKeyBytes.length == 0) {
- startKeyBytes = new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
+ startKeyBytes = START_KEY_BYTES;
}
- addContent(new HRegionLoader(r), column, startKeyBytes, endKey);
+ addContent(new HRegionLoader(r), column, startKeyBytes, endKey, -1);
}
/**
@@ -132,8 +134,7 @@
*/
protected static void addContent(final Loader updater, final String column)
throws IOException {
- addContent(updater, column,
- new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR}, null);
+ addContent(updater, column, START_KEY_BYTES, null);
}
/**
@@ -149,6 +150,23 @@
protected static void addContent(final Loader updater, final String column,
final byte [] startKeyBytes, final Text endKey)
throws IOException {
+ addContent(updater, column, startKeyBytes, endKey, -1);
+ }
+
+ /**
+ * Add content to region <code>r</code> on the passed column
+ * <code>column</code>.
+ * Adds data of the from 'aaa', 'aab', etc where key and value are the same.
+ * @param updater An instance of {@link Loader}.
+ * @param column
+ * @param startKeyBytes Where to start the rows inserted
+ * @param endKey Where to stop inserting rows.
+ * @param ts Timestamp to write the content with.
+ * @throws IOException
+ */
+ protected static void addContent(final Loader updater, final String column,
+ final byte [] startKeyBytes, final Text endKey, final long ts)
+ throws IOException {
// Add rows of three characters. The first character starts with the
// 'a' character and runs up to 'z'. Per first character, we run the
// second character over same range. And same for the third so rows
@@ -167,7 +185,11 @@
long lockid = updater.startBatchUpdate(t);
try {
updater.put(lockid, new Text(column), bytes);
- updater.commit(lockid);
+ if (ts == -1) {
+ updater.commit(lockid);
+ } else {
+ updater.commit(lockid, ts);
+ }
lockid = -1;
} finally {
if (lockid != -1) {
@@ -190,6 +212,7 @@
public long startBatchUpdate(final Text row) throws IOException;
public void put(long lockid, Text column, byte val[]) throws IOException;
public void commit(long lockid) throws IOException;
+ public void commit(long lockid, long ts) throws IOException;
public void abort(long lockid) throws IOException;
}
@@ -208,6 +231,9 @@
public void commit(long lockid) throws IOException {
this.region.commit(lockid, System.currentTimeMillis());
}
+ public void commit(long lockid, final long ts) throws IOException {
+ this.region.commit(lockid, ts);
+ }
public void put(long lockid, Text column, byte[] val) throws IOException {
this.region.put(lockid, column, val);
}
@@ -230,6 +256,9 @@
}
public void commit(long lockid) throws IOException {
this.table.commit(lockid);
+ }
+ public void commit(long lockid, final long ts) throws IOException {
+ this.table.commit(lockid, ts);
}
public void put(long lockid, Text column, byte[] val) throws IOException {
this.table.put(lockid, column, val);
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java?rev=572980&r1=572979&r2=572980&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java Wed Sep 5 09:00:01 2007
@@ -18,13 +18,18 @@
package org.apache.hadoop.hbase;
+import java.io.IOException;
import java.util.TreeMap;
+
import org.apache.hadoop.io.Text;
-/** Tests user specifyable time stamps */
-public class TestTimestamp extends HBaseClusterTestCase {
+/**
+ * Tests user specifiable time stamps
+ */
+public class TestTimestamp extends HBaseTestCase {
private static final long T0 = 10L;
private static final long T1 = 100L;
+ private static final long T2 = 200L;
private static final String COLUMN_NAME = "contents:";
private static final String TABLE_NAME = "test";
@@ -37,157 +42,226 @@
};
private static final Text TABLE = new Text(TABLE_NAME);
private static final Text ROW = new Text("row");
-
- private HTable table;
- /** constructor */
- public TestTimestamp() {
- super();
+ /**
+ * Test that delete works according to description in <a
+ * href="https://issues.apache.org/jira/browse/HADOOP-1784">hadoop-1784</a>
+ * when it comes to timestamps.
+ * @throws IOException
+ */
+ public void testDelete() throws IOException {
+ HRegion r = createRegion();
+ try {
+ HRegionLoader loader = new HRegionLoader(r);
+ // Add a couple of values for three different timestamps.
+ addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T0);
+ addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T1);
+ addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T2);
+ addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"));
+ // If I delete w/o specifying a timestamp, this means I'm deleting the
+ // latest.
+ delete(r, System.currentTimeMillis());
+ // Verify that I get back T2 through T0.
+ } finally {
+ r.close();
+ r.getLog().closeAndDelete();
+ }
}
-
- /** {@inheritDoc} */
- @Override
- public void setUp() throws Exception {
- super.setUp();
-
- HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
- desc.addFamily(new HColumnDescriptor(COLUMN_NAME));
-
+
+ private void delete(final HRegion r, final long ts) throws IOException {
+ long lockid = r.startUpdate(ROW);
+ r.delete(lockid, COLUMN);
+ r.commit(lockid, ts == -1? System.currentTimeMillis(): ts);
+ }
+
+ /**
+ * Test scanning against different timestamps.
+ * @throws IOException
+ */
+ public void testTimestampScanning() throws IOException {
+ HRegion r = createRegion();
try {
- HBaseAdmin admin = new HBaseAdmin(conf);
- admin.createTable(desc);
-
- } catch (Exception e) {
- e.printStackTrace();
- fail();
+ HRegionLoader loader = new HRegionLoader(r);
+ // Add a couple of values for three different timestamps.
+ addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T0);
+ addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"), T1);
+ addContent(loader, COLUMN_NAME, START_KEY_BYTES, new Text("aad"));
+ // Get count of latest items.
+ int count = assertScanContentTimestamp(r, System.currentTimeMillis());
+ // Assert I get same count when I scan at each timestamp.
+ assertEquals(count, assertScanContentTimestamp(r, T0));
+ assertEquals(count, assertScanContentTimestamp(r, T1));
+ // Flush everything out to disk and then retry
+ r.flushcache(false);
+ assertEquals(count, assertScanContentTimestamp(r, T0));
+ assertEquals(count, assertScanContentTimestamp(r, T1));
+ } finally {
+ r.close();
+ r.getLog().closeAndDelete();
}
}
- /** the test */
- public void testTimestamp() {
+ /*
+ * Assert that the scan returns only values < timestamp.
+ * @param r
+ * @param ts
+ * @return Count of items scanned.
+ * @throws IOException
+ */
+ private int assertScanContentTimestamp(final HRegion r, final long ts)
+ throws IOException {
+ int count = 0;
+ HInternalScannerInterface scanner =
+ r.getScanner(COLUMNS, HConstants.EMPTY_START_ROW, ts, null);
try {
- table = new HTable(conf, TABLE);
+ HStoreKey key = new HStoreKey();
+ TreeMap<Text, byte []>value = new TreeMap<Text, byte[]>();
+ while (scanner.next(key, value)) {
+ assertTrue(key.getTimestamp() <= ts);
+ Text row = key.getRow();
+ assertEquals(row.toString(),
+ new String(value.get(COLUMN), HConstants.UTF8_ENCODING));
+ count++;
+ value.clear();
+ }
+ } finally {
+ scanner.close();
+ }
+ return count;
+ }
+
+ /**
+ * Basic test of timestamps.
+ * TODO: Needs rewrite after hadoop-1784 gets fixed.
+ * @throws IOException
+ */
+ public void testTimestamps() throws IOException {
+ MiniHBaseCluster cluster = new MiniHBaseCluster(this.conf, 1);
+ try {
+ HTable table = createTable();
// store a value specifying an update time
-
- long lockid = table.startUpdate(ROW);
- table.put(lockid, COLUMN, VERSION1.getBytes(HConstants.UTF8_ENCODING));
- table.commit(lockid, T0);
+ put(table, VERSION1.getBytes(HConstants.UTF8_ENCODING), T0);
// store a value specifying 'now' as the update time
-
- lockid = table.startUpdate(ROW);
- table.put(lockid, COLUMN, LATEST.getBytes(HConstants.UTF8_ENCODING));
- table.commit(lockid);
+ put(table, LATEST.getBytes(HConstants.UTF8_ENCODING), -1);
// delete values older than T1
-
- lockid = table.startUpdate(ROW);
+ long lockid = table.startUpdate(ROW);
table.delete(lockid, COLUMN);
table.commit(lockid, T1);
// now retrieve...
-
- // the most recent version:
-
- byte[] bytes = table.get(ROW, COLUMN);
- assertTrue(bytes != null && bytes.length != 0);
- assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING)));
-
- // any version <= time T1
-
- byte[][] values = table.get(ROW, COLUMN, T1, 3);
- assertNull(values);
-
- // the version from T0
-
- values = table.get(ROW, COLUMN, T0, 3);
- assertTrue(values.length == 1
- && VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING)));
+ assertGets(table);
// flush everything out to disk
-
HRegionServer s = cluster.regionThreads.get(0).getRegionServer();
for(HRegion r: s.onlineRegions.values() ) {
r.flushcache(false);
}
// now retrieve...
-
- // the most recent version:
-
- bytes = table.get(ROW, COLUMN);
- assertTrue(bytes != null && bytes.length != 0);
- assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING)));
-
- // any version <= time T1
-
- values = table.get(ROW, COLUMN, T1, 3);
- assertNull(values);
-
- // the version from T0
-
- values = table.get(ROW, COLUMN, T0, 3);
- assertTrue(values.length == 1
- && VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING)));
-
- // three versions older than now
-
- values = table.get(ROW, COLUMN, 3);
- assertTrue(values.length == 1
- && LATEST.equals(new String(values[0], HConstants.UTF8_ENCODING)));
+ assertGets(table);
// Test scanners
-
- HScannerInterface scanner =
- table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW);
- try {
- HStoreKey key = new HStoreKey();
- TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
- int count = 0;
- while(scanner.next(key, results)) {
- count++;
- }
- assertEquals(count, 1);
- assertEquals(results.size(), 1);
-
- } finally {
- scanner.close();
- }
-
- scanner = table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW, T1);
- try {
- HStoreKey key = new HStoreKey();
- TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
- int count = 0;
- while(scanner.next(key, results)) {
- count++;
- }
- assertEquals(count, 0);
- assertEquals(results.size(), 0);
-
- } finally {
- scanner.close();
- }
-
- scanner = table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW, T0);
- try {
- HStoreKey key = new HStoreKey();
- TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
- int count = 0;
- while(scanner.next(key, results)) {
- count++;
- }
- assertEquals(count, 0);
- assertEquals(results.size(), 0);
-
- } finally {
- scanner.close();
+ assertScanCount(table, -1, 1);
+ assertScanCount(table, T1, 0);
+ } catch (Exception e) {
+ cluster.shutdown();
+ }
+ }
+
+ /*
+ * Test count of results scanning.
+ * @param table
+ * @param ts
+ * @param expectedCount
+ * @throws IOException
+ */
+ private void assertScanCount(final HTable table, final long ts,
+ final int expectedCount)
+ throws IOException {
+ HScannerInterface scanner = (ts == -1)?
+ table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW):
+ table.obtainScanner(COLUMNS, HConstants.EMPTY_START_ROW, ts);
+ try {
+ HStoreKey key = new HStoreKey();
+ TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+ int count = 0;
+ while(scanner.next(key, results)) {
+ count++;
}
+ assertEquals(count, expectedCount);
+ assertEquals(results.size(), expectedCount);
- } catch (Exception e) {
- e.printStackTrace();
- fail();
+ } finally {
+ scanner.close();
+ }
+ }
+
+ /*
+ * Test can do basic gets.
+ * Used by testTimestamp above.
+ * @param table
+ * @throws IOException
+ */
+ private void assertGets(final HTable table) throws IOException {
+ // the most recent version:
+ byte[] bytes = table.get(ROW, COLUMN);
+ assertTrue(bytes != null && bytes.length != 0);
+ assertTrue(LATEST.equals(new String(bytes, HConstants.UTF8_ENCODING)));
+
+ // any version <= time T1
+ byte[][] values = table.get(ROW, COLUMN, T1, 3);
+ assertNull(values);
+
+ // the version from T0
+ values = table.get(ROW, COLUMN, T0, 3);
+ assertTrue(values.length == 1
+ && VERSION1.equals(new String(values[0], HConstants.UTF8_ENCODING)));
+
+ // three versions older than now
+ values = table.get(ROW, COLUMN, 3);
+ assertTrue(values.length == 1
+ && LATEST.equals(new String(values[0], HConstants.UTF8_ENCODING)));
+ }
+
+ /*
+ * Put values.
+ * @param table
+ * @param bytes
+ * @param ts
+ * @throws IOException
+ */
+ private void put(final HTable table, final byte [] bytes, final long ts)
+ throws IOException {
+ long lockid = table.startUpdate(ROW);
+ table.put(lockid, COLUMN, bytes);
+ if (ts == -1) {
+ table.commit(lockid);
+ } else {
+ table.commit(lockid, ts);
}
}
-}
+
+ /*
+ * Create a table named TABLE_NAME.
+ * @return An instance of an HTable connected to the created table.
+ * @throws IOException
+ */
+ private HTable createTable() throws IOException {
+ HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
+ desc.addFamily(new HColumnDescriptor(COLUMN_NAME));
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ admin.createTable(desc);
+ return new HTable(conf, TABLE);
+ }
+
+ private HRegion createRegion() throws IOException {
+ HLog hlog = new HLog(this.localFs, this.testDir, this.conf);
+ HTableDescriptor htd = createTableDescriptor(getName());
+ htd.addFamily(new HColumnDescriptor(COLUMN_NAME));
+ HRegionInfo hri = new HRegionInfo(1, htd, null, null);
+ return new HRegion(testDir, hlog, this.localFs, this.conf, hri, null);
+ }
+}
\ No newline at end of file