You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2007/09/10 17:56:18 UTC
svn commit: r574287 [1/2] - in /lucene/hadoop/trunk/src/contrib/hbase: ./
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/io/
src/test/org/apache/hadoop/hbase/
Author: stack
Date: Mon Sep 10 08:56:16 2007
New Revision: 574287
URL: http://svn.apache.org/viewvc?rev=574287&view=rev
Log:
HADOOP-1784 Delete: Fix scanners and gets so they work properly in presence
of deletes. Added a deleteAll to remove all cells equal to or older than
passed timestamp. Fixed compaction so deleted cells do not make it out
into compacted output. Ensure also that versions > column max are dropped
compacting.
Modified:
lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HScannerInterface.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner2.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Mon Sep 10 08:56:16 2007
@@ -8,6 +8,11 @@
NEW FEATURES
HADOOP-1768 FS command using Hadoop FsShell operations
(Edward Yoon via Stack)
+ HADOOP-1784 Delete: Fix scanners and gets so they work properly in presence
+ of deletes. Added a deleteAll to remove all cells equal to or
+ older than passed timestamp. Fixed compaction so deleted cells
+ do not make it out into compacted output. Ensure also that
+ versions > column max are dropped compacting.
OPTIMIZATIONS
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HAbstractScanner.java Mon Sep 10 08:56:16 2007
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
+import java.util.SortedMap;
import java.util.TreeMap;
import java.util.Vector;
import java.util.regex.Pattern;
@@ -205,7 +206,7 @@
*
* @see org.apache.hadoop.hbase.HScannerInterface#next(org.apache.hadoop.hbase.HStoreKey, java.util.SortedMap)
*/
- public boolean next(HStoreKey key, TreeMap<Text, byte []> results)
+ public boolean next(HStoreKey key, SortedMap<Text, byte []> results)
throws IOException {
// Find the next row label (and timestamp)
Text chosenRow = null;
@@ -218,7 +219,6 @@
|| (keys[i].getRow().compareTo(chosenRow) < 0)
|| ((keys[i].getRow().compareTo(chosenRow) == 0)
&& (keys[i].getTimestamp() > chosenTimestamp)))) {
-
chosenRow = new Text(keys[i].getRow());
chosenTimestamp = keys[i].getTimestamp();
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java Mon Sep 10 08:56:16 2007
@@ -103,7 +103,7 @@
// be the first to be reassigned if the server(s) they are being served by
// should go down.
- /** The root table's name. */
+ /** The root table's name.*/
static final Text ROOT_TABLE_NAME = new Text("-ROOT-");
/** The META table's name. */
@@ -139,10 +139,28 @@
static final Text COL_SPLITB = new Text(COLUMN_FAMILY_STR + "splitB");
// Other constants
- /** used by scanners, etc when they want to start at the beginning of a region */
- static final Text EMPTY_START_ROW = new Text();
+ /**
+ * An empty instance of Text.
+ */
+ static final Text EMPTY_TEXT = new Text();
+
+ /**
+ * Used by scanners, etc when they want to start at the beginning of a region
+ */
+ static final Text EMPTY_START_ROW = EMPTY_TEXT;
/** When we encode strings, we always specify UTF8 encoding */
static final String UTF8_ENCODING = "UTF-8";
+ /**
+ * Timestamp to use when we want to refer to the latest cell.
+ * This is the timestamp sent by clients when no timestamp is specified on
+ * commit.
+ */
+ static final long LATEST_TIMESTAMP = Long.MAX_VALUE;
+
+ /**
+ * Define for 'return-all-versions'.
+ */
+ static final int ALL_VERSIONS = -1;
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HInternalScannerInterface.java Mon Sep 10 08:56:16 2007
@@ -19,11 +19,6 @@
*/
package org.apache.hadoop.hbase;
-import java.io.IOException;
-import java.util.TreeMap;
-
-import org.apache.hadoop.io.Text;
-
/**
* Internally, we need to be able to determine if the scanner is doing wildcard
* column matches (when only a column family is specified or if a column regex
@@ -31,29 +26,7 @@
* specified. If so, we need to ignore the timestamp to ensure that we get all
* the family members, as they may have been last updated at different times.
*/
-public interface HInternalScannerInterface {
-
- /**
- * Grab the next row's worth of values. The HScanner will return the most
- * recent data value for each row that is not newer than the target time.
- *
- * If a dataFilter is defined, it will be used to skip rows that do not
- * match its criteria. It may cause the scanner to stop prematurely if it
- * knows that it will no longer accept the remaining results.
- *
- * @param key HStoreKey containing row and timestamp
- * @param results Map of column/value pairs
- * @return true if a value was found
- * @throws IOException
- */
- public boolean next(HStoreKey key, TreeMap<Text, byte []> results)
- throws IOException;
-
- /**
- * Close the scanner.
- */
- public void close();
-
+public interface HInternalScannerInterface extends HScannerInterface {
/** @return true if the scanner is matching a column family or regex */
public boolean isWildcardScanner();
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Mon Sep 10 08:56:16 2007
@@ -175,18 +175,15 @@
* @return An array of byte arrays ordered by timestamp.
*/
public byte [][] get(final HStoreKey key, final int numVersions) {
- List<byte []> results = new ArrayList<byte[]>();
this.lock.obtainReadLock();
try {
- ArrayList<byte []> result =
- get(memcache, key, numVersions - results.size());
- results.addAll(0, result);
+ ArrayList<byte []> results = get(memcache, key, numVersions);
for (int i = history.size() - 1; i >= 0; i--) {
if (numVersions > 0 && results.size() >= numVersions) {
break;
}
- result = get(history.elementAt(i), key, numVersions - results.size());
- results.addAll(results.size(), result);
+ results.addAll(results.size(),
+ get(history.elementAt(i), key, numVersions - results.size()));
}
return (results.size() == 0)?
null: ImmutableBytesWritable.toArray(results);
@@ -194,7 +191,6 @@
this.lock.releaseReadLock();
}
}
-
/**
* Return all the available columns for the given key. The key indicates a
@@ -248,7 +244,8 @@
* @param map
* @param key
* @param numVersions
- * @return Ordered list of items found in passed <code>map</code>
+ * @return Ordered list of items found in passed <code>map</code>. If no
+ * matching values, returns an empty list (does not return null).
*/
ArrayList<byte []> get(final TreeMap<HStoreKey, byte []> map,
final HStoreKey key, final int numVersions) {
@@ -261,21 +258,87 @@
for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
HStoreKey itKey = es.getKey();
if (itKey.matchesRowCol(curKey)) {
- if(HGlobals.deleteBytes.compareTo(es.getValue()) == 0) {
- // TODO: Shouldn't this be a continue rather than a break? Perhaps
- // the intent is that this DELETE_BYTES is meant to suppress older
- // info -- see 5.4 Compactions in BigTable -- but how does this jibe
- // with being able to remove one version only?
- break;
+ if (!isDeleted(es.getValue())) {
+ result.add(tailMap.get(itKey));
+ curKey.setVersion(itKey.getTimestamp() - 1);
}
- result.add(tailMap.get(itKey));
- curKey.setVersion(itKey.getTimestamp() - 1);
}
if (numVersions > 0 && result.size() >= numVersions) {
break;
}
}
return result;
+ }
+
+ /**
+ * Get <code>versions</code> keys matching the origin key's
+ * row/column/timestamp and those of an older vintage
+ * Default access so can be accessed out of {@link HRegionServer}.
+ * @param origin Where to start searching.
+ * @param versions How many versions to return. Pass
+ * {@link HConstants.ALL_VERSIONS} to retrieve all.
+ * @return Ordered list of <code>versions</code> keys going from newest back.
+ * @throws IOException
+ */
+ List<HStoreKey> getKeys(final HStoreKey origin, final int versions) {
+ this.lock.obtainReadLock();
+ try {
+ List<HStoreKey> results = getKeys(this.memcache, origin, versions);
+ for (int i = history.size() - 1; i >= 0; i--) {
+ results.addAll(results.size(), getKeys(history.elementAt(i), origin,
+ versions == HConstants.ALL_VERSIONS? versions:
+ (results != null? versions - results.size(): versions)));
+ }
+ return results;
+ } finally {
+ this.lock.releaseReadLock();
+ }
+ }
+
+ /*
+ * @param origin Where to start searching.
+ * @param versions How many versions to return. Pass
+ * {@link HConstants.ALL_VERSIONS} to retrieve all.
+ * @return List of all keys that are of the same row and column and of
+ * equal or older timestamp. If no keys, returns an empty List. Does not
+ * return null.
+ */
+ private List<HStoreKey> getKeys(final TreeMap<HStoreKey, byte []> map,
+ final HStoreKey origin, final int versions) {
+ List<HStoreKey> result = new ArrayList<HStoreKey>();
+ SortedMap<HStoreKey, byte []> tailMap = map.tailMap(origin);
+ for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
+ HStoreKey key = es.getKey();
+ if (!key.matchesRowCol(origin)) {
+ break;
+ }
+ if (!isDeleted(es.getValue())) {
+ result.add(key);
+ if (versions != HConstants.ALL_VERSIONS && result.size() >= versions) {
+ // We have enough results. Return.
+ break;
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * @param key
+ * @return True if an entry and its content is {@link HGlobals.deleteBytes}.
+ * Use checking values in store. On occasion the memcache has the fact that
+ * the cell has been deleted.
+ */
+ boolean isDeleted(final HStoreKey key) {
+ return isDeleted(this.memcache.get(key));
+ }
+
+ /**
+ * @param value
+ * @return True if an entry and its content is {@link HGlobals.deleteBytes}.
+ */
+ boolean isDeleted(final byte [] value) {
+ return (value == null)? false: HGlobals.deleteBytes.compareTo(value) == 0;
}
/**
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Mon Sep 10 08:56:16 2007
@@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.Vector;
@@ -581,6 +582,9 @@
lock.obtainReadLock();
try {
HStore.HStoreSize biggest = largestHStore(midKey);
+ if (biggest == null) {
+ return false;
+ }
long triggerSize =
this.desiredMaxFileSize + (this.desiredMaxFileSize / 2);
boolean split = (biggest.getAggregate() >= triggerSize);
@@ -911,26 +915,47 @@
}
}
- /** Private implementation: get the value for the indicated HStoreKey */
- private byte [][] get(HStoreKey key, int numVersions) throws IOException {
-
+ private byte [][] get(final HStoreKey key, final int numVersions)
+ throws IOException {
lock.obtainReadLock();
try {
// Check the memcache
- byte [][] result = memcache.get(key, numVersions);
- if(result != null) {
- return result;
+ byte [][] memcacheResult = this.memcache.get(key, numVersions);
+ // If we got sufficient versions from memcache, return.
+ if (memcacheResult != null && memcacheResult.length == numVersions) {
+ return memcacheResult;
}
- // If unavailable in memcache, check the appropriate HStore
+ // Check hstore for more versions.
Text colFamily = HStoreKey.extractFamily(key.getColumn());
HStore targetStore = stores.get(colFamily);
if(targetStore == null) {
- return null;
+ // There are no stores. Return what we got from memcache.
+ return memcacheResult;
}
-
- return targetStore.get(key, numVersions);
+ // Update the number of versions we need to fetch from the store.
+ int amendedNumVersions = numVersions;
+ if (memcacheResult != null) {
+ amendedNumVersions -= memcacheResult.length;
+ }
+ byte [][] result =
+ targetStore.get(key, amendedNumVersions, this.memcache);
+ if (result == null) {
+ result = memcacheResult;
+ } else if (memcacheResult != null) {
+ // We have results from both memcache and from stores. Put them
+ // together in an array in the proper order.
+ byte [][] storeResult = result;
+ result = new byte [memcacheResult.length + result.length][];
+ for (int i = 0; i < memcacheResult.length; i++) {
+ result[i] = memcacheResult[i];
+ }
+ for (int i = 0; i < storeResult.length; i++) {
+ result[i + memcacheResult.length] = storeResult[i];
+ }
+ }
+ return result;
} finally {
lock.releaseReadLock();
}
@@ -963,6 +988,45 @@
}
/**
+ * Get all keys matching the origin key's row/column/timestamp and those
+ * of an older vintage
+ * Default access so can be accessed out of {@link HRegionServer}.
+ * @param origin Where to start searching.
+ * @return Ordered list of keys going from newest on back.
+ * @throws IOException
+ */
+ List<HStoreKey> getKeys(final HStoreKey origin) throws IOException {
+ return getKeys(origin, ALL_VERSIONS);
+ }
+
+ /**
+ * Get <code>versions</code> keys matching the origin key's
+ * row/column/timestamp and those of an older vintage
+ * Default access so can be accessed out of {@link HRegionServer}.
+ * @param origin Where to start searching.
+ * @param versions How many versions to return. Pass
+ * {@link HConstants.ALL_VERSIONS} to retrieve all.
+ * @return Ordered list of <code>versions</code> keys going from newest back.
+ * @throws IOException
+ */
+ List<HStoreKey> getKeys(final HStoreKey origin, final int versions)
+ throws IOException {
+ List<HStoreKey> keys = this.memcache.getKeys(origin, versions);
+ if (versions != ALL_VERSIONS && keys.size() >= versions) {
+ return keys;
+ }
+ // Check hstore for more versions.
+ Text colFamily = HStoreKey.extractFamily(origin.getColumn());
+ HStore targetStore = stores.get(colFamily);
+ if (targetStore != null) {
+ // Pass versions without modification since in the store getKeys, it
+ // includes the size of the passed <code>keys</code> array when counting.
+ keys = targetStore.getKeys(origin, keys, versions);
+ }
+ return keys;
+ }
+
+ /**
* Return an iterator that scans over the HRegion, returning the indicated
* columns for only the rows that match the data filter. This Iterator must be closed by the caller.
*
@@ -1110,8 +1174,8 @@
}
/**
- * Delete a value or write a value. This is a just a convenience method for put().
- *
+ * Delete a value or write a value.
+ * This is a just a convenience method for put().
* @param lockid lock id obtained from startUpdate
* @param targetCol name of column to be deleted
* @throws IOException
@@ -1119,6 +1183,51 @@
public void delete(long lockid, Text targetCol) throws IOException {
localput(lockid, targetCol, HGlobals.deleteBytes.get());
}
+
+ /**
+ * Delete all cells of the same age as the passed timestamp or older.
+ * @param row
+ * @param column
+ * @param ts Delete all entries that have this timestamp or older
+ * @throws IOException
+ */
+ public void deleteAll(final Text row, final Text column, final long ts)
+ throws IOException {
+ deleteMultiple(row, column, ts, ALL_VERSIONS);
+ }
+
+ /**
+ * Delete one or many cells.
+ * Used to support {@link #deleteAll(Text, Text, long)} and deletion of
+ * latest cell.
+ * @param row
+ * @param column
+ * @param ts Timestamp to start search on.
+ * @param versions How many versions to delete. Pass
+ * {@link HConstants.ALL_VERSIONS} to delete all.
+ * @throws IOException
+ */
+ void deleteMultiple(final Text row, final Text column, final long ts,
+ final int versions)
+ throws IOException {
+ lock.obtainReadLock();
+ try {
+ checkColumn(column);
+ HStoreKey origin = new HStoreKey(row, column, ts);
+ synchronized(row) {
+ List<HStoreKey> keys = getKeys(origin, versions);
+ if (keys.size() > 0) {
+ TreeMap<Text, byte []> edits = new TreeMap<Text, byte []>();
+ edits.put(column, HGlobals.deleteBytes.get());
+ for (HStoreKey key: keys) {
+ update(row, key.getTimestamp(), edits);
+ }
+ }
+ }
+ } finally {
+ lock.releaseReadLock();
+ }
+ }
/**
* Private implementation.
@@ -1202,10 +1311,11 @@
* Once updates hit the change log, they are safe. They will either be moved
* into an HStore in the future, or they will be recovered from the log.
* @param lockid Lock for row we're to commit.
- * @param timestamp the time to associate with this change
+ * @param timestamp the time to associate with this change.
* @throws IOException
*/
- public void commit(final long lockid, long timestamp) throws IOException {
+ public void commit(final long lockid, final long timestamp)
+ throws IOException {
// Remove the row from the pendingWrites list so
// that repeated executions won't screw this up.
Text row = getRowFromLock(lockid);
@@ -1216,19 +1326,75 @@
// This check makes sure that another thread from the client
// hasn't aborted/committed the write-operation
synchronized(row) {
- // Add updates to the log and add values to the memcache.
Long lid = Long.valueOf(lockid);
- TreeMap<Text, byte []> columns = this.targetColumns.get(lid);
- if (columns != null && columns.size() > 0) {
- log.append(regionInfo.regionName, regionInfo.tableDesc.getName(),
- row, columns, timestamp);
- memcache.add(row, columns, timestamp);
- // OK, all done!
- }
+ update(row, timestamp, this.targetColumns.get(lid));
targetColumns.remove(lid);
releaseRowLock(row);
}
}
+
+ /**
+ * This method for unit testing only.
+ * Does each operation individually so can do appropriate
+ * {@link HConstants#LATEST_TIMESTAMP} action. Tries to mimic how
+ * {@link HRegionServer#batchUpdate(Text, long, org.apache.hadoop.hbase.io.BatchUpdate)}
+ * works when passed a timestamp of LATEST_TIMESTAMP.
+ * @param lockid Lock for row we're to commit.
+ * @throws IOException
+ * @throws IOException
+ * @see {@link #commit(long, long)}
+ */
+ void commit(final long lockid) throws IOException {
+ // Remove the row from the pendingWrites list so
+ // that repeated executions won't screw this up.
+ Text row = getRowFromLock(lockid);
+ if(row == null) {
+ throw new LockException("No write lock for lockid " + lockid);
+ }
+
+ // This check makes sure that another thread from the client
+ // hasn't aborted/committed the write-operation
+ synchronized(row) {
+ Long lid = Long.valueOf(lockid);
+ TreeMap<Text, byte []> updatesByColumn = this.targetColumns.get(lid);
+ // Run updates one at a time so we can supply appropriate timestamp
+ long now = System.currentTimeMillis();
+ for (Map.Entry<Text, byte []>e: updatesByColumn.entrySet()) {
+ if (HGlobals.deleteBytes.equals(e.getValue())) {
+ // Its a delete. Delete latest. deleteMultiple calls update for us.
+ // Actually regets the row lock but since we already have it, should
+ // be fine.
+ deleteMultiple(row, e.getKey(), LATEST_TIMESTAMP, 1);
+ continue;
+ }
+ // Must be a 'put'.
+ TreeMap<Text, byte []> putEdit = new TreeMap<Text, byte []>();
+ putEdit.put(e.getKey(), e.getValue());
+ update(row, now, putEdit);
+ }
+ this.targetColumns.remove(lid);
+ releaseRowLock(row);
+ }
+ }
+
+ /*
+ * Add updates to the log and add values to the memcache.
+ * Warning: Assumption is caller has lock on passed in row.
+ * @param row Row to update.
+ * @param timestamp Timestamp to record the updates against
+ * @param updatesByColumn Cell updates by column
+ * @throws IOException
+ */
+ private void update(final Text row, final long timestamp,
+ final TreeMap<Text, byte []> updatesByColumn)
+ throws IOException {
+ if (updatesByColumn == null || updatesByColumn.size() <= 0) {
+ return;
+ }
+ this.log.append(regionInfo.regionName, regionInfo.tableDesc.getName(),
+ row, updatesByColumn, timestamp);
+ this.memcache.add(row, updatesByColumn, timestamp);
+ }
//////////////////////////////////////////////////////////////////////////////
// Support code
@@ -1250,7 +1416,11 @@
}
}
- /** Make sure this is a valid column for the current table */
+ /**
+ * Make sure this is a valid column for the current table
+ * @param columnName
+ * @throws IOException
+ */
void checkColumn(Text columnName) throws IOException {
Text family = new Text(HStoreKey.extractFamily(columnName) + ":");
if(! regionInfo.tableDesc.hasFamily(family)) {
@@ -1359,10 +1529,6 @@
dataFilter.reset();
}
this.scanners = new HInternalScannerInterface[stores.length + 1];
- for(int i = 0; i < this.scanners.length; i++) {
- this.scanners[i] = null;
- }
-
this.resultSets = new TreeMap[scanners.length];
this.keys = new HStoreKey[scanners.length];
this.wildcardMatch = false;
@@ -1424,12 +1590,11 @@
public boolean isMultipleMatchScanner() {
return multipleMatchers;
}
-
- /**
- * {@inheritDoc}
- */
- public boolean next(HStoreKey key, TreeMap<Text, byte[]> results)
+
+ public boolean next(HStoreKey key, SortedMap<Text, byte[]> results)
throws IOException {
+ // Filtered flag is set by filters. If a cell has been 'filtered out'
+ // -- i.e. it is not to be returned to the caller -- the flag is 'true'.
boolean filtered = true;
boolean moreToFollow = true;
while (filtered && moreToFollow) {
@@ -1446,19 +1611,27 @@
chosenTimestamp = keys[i].getTimestamp();
}
}
-
+
// Filter whole row by row key?
filtered = dataFilter != null? dataFilter.filter(chosenRow) : false;
// Store the key and results for each sub-scanner. Merge them as
// appropriate.
- if (chosenTimestamp > 0 && !filtered) {
+ if (chosenTimestamp >= 0 && !filtered) {
+ // Here we are setting the passed in key with current row+timestamp
key.setRow(chosenRow);
key.setVersion(chosenTimestamp);
- key.setColumn(new Text(""));
-
+ key.setColumn(HConstants.EMPTY_TEXT);
+ // Keep list of deleted cell keys within this row. We need this
+ // because as we go through scanners, the delete record may be in an
+ // early scanner and then the same record with a non-delete, non-null
+ // value in a later. Without history of what we've seen, we'll return
+ // deleted values. This List should not ever grow too large since we
+ // are only keeping rows and columns that match those set on the
+ // scanner and which have delete values. If memory usage becomes a
+ // problem, could redo as bloom filter.
+ List<HStoreKey> deletes = new ArrayList<HStoreKey>();
for (int i = 0; i < scanners.length && !filtered; i++) {
-
while ((scanners[i] != null
&& !filtered
&& moreToFollow)
@@ -1481,8 +1654,19 @@
// but this had the effect of overwriting newer
// values with older ones. So now we only insert
// a result if the map does not contain the key.
+ HStoreKey hsk = new HStoreKey(key.getRow(), EMPTY_TEXT,
+ key.getTimestamp());
for (Map.Entry<Text, byte[]> e : resultSets[i].entrySet()) {
- if (!filtered && moreToFollow &&
+ hsk.setColumn(e.getKey());
+ if (HGlobals.deleteBytes.equals(e.getValue())) {
+ if (!deletes.contains(hsk)) {
+ // Key changes as we cycle the for loop so add a copy to
+ // the set of deletes.
+ deletes.add(new HStoreKey(hsk));
+ }
+ } else if (!deletes.contains(hsk) &&
+ !filtered &&
+ moreToFollow &&
!results.containsKey(e.getKey())) {
if (dataFilter != null) {
// Filter whole row by column data?
@@ -1496,7 +1680,6 @@
results.put(e.getKey(), e.getValue());
}
}
-
resultSets[i].clear();
if (!scanners[i].next(keys[i], resultSets[i])) {
closeScanner(i);
@@ -1516,8 +1699,8 @@
}
}
}
-
- moreToFollow = chosenTimestamp > 0;
+
+ moreToFollow = chosenTimestamp >= 0;
if (dataFilter != null) {
if (moreToFollow) {
@@ -1533,6 +1716,17 @@
LOG.debug("ROWKEY = " + chosenRow + ", FILTERED = " + filtered);
}
}
+
+ if (results.size() <= 0 && !filtered) {
+ // There were no results found for this row. Marked it as
+ // 'filtered'-out otherwise we will not move on to the next row.
+ filtered = true;
+ }
+ }
+
+ // If we got no results, then there is no more to follow.
+ if (results == null || results.size() <= 0) {
+ moreToFollow = false;
}
// Make sure scanners closed if no more results
@@ -1551,7 +1745,11 @@
/** Shut down a single scanner */
void closeScanner(int i) {
try {
- scanners[i].close();
+ try {
+ scanners[i].close();
+ } catch (IOException e) {
+ LOG.warn("Failed closeing scanner " + i, e);
+ }
} finally {
scanners[i] = null;
keys[i] = null;
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInterface.java Mon Sep 10 08:56:16 2007
@@ -38,8 +38,8 @@
/**
* Get metainfo about an HRegion
*
- * @param regionName - name of the region
- * @return - HRegionInfo object for region
+ * @param regionName name of the region
+ * @return HRegionInfo object for region
* @throws NotServingRegionException
*/
public HRegionInfo getRegionInfo(final Text regionName)
@@ -69,7 +69,7 @@
* @throws IOException
*/
public byte [][] get(final Text regionName, final Text row,
- final Text column, final int numVersions)
+ final Text column, final int numVersions)
throws IOException;
/**
@@ -107,7 +107,21 @@
* @param b BatchUpdate
* @throws IOException
*/
- public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) throws IOException;
+ public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
+ throws IOException;
+
+ /**
+ * Delete all cells that match the passed row and column and whose
+ * timestamp is equal-to or older than the passed timestamp.
+ *
+ * @param regionName region name
+ * @param row row key
+ * @param column column key
+ * @param timestamp Delete all entries that have this timestamp or older
+ * @throws IOException
+ */
+ public void deleteAll(Text regionName, Text row, Text column, long timestamp)
+ throws IOException;
//
// remote scanner interface
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Mon Sep 10 08:56:16 2007
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.SortedMap;
@@ -1075,22 +1076,13 @@
leases.renewLease(scannerId, scannerId);
// Collect values to be returned here
-
MapWritable values = new MapWritable();
-
- // Keep getting rows until we find one that has at least one non-deleted column value
-
HStoreKey key = new HStoreKey();
TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
while (s.next(key, results)) {
for(Map.Entry<Text, byte []> e: results.entrySet()) {
- HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp());
- byte [] val = e.getValue();
- if (HGlobals.deleteBytes.compareTo(val) == 0) {
- // Column value is deleted. Don't return it.
- continue;
- }
- values.put(k, new ImmutableBytesWritable(val));
+ values.put(new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp()),
+ new ImmutableBytesWritable(e.getValue()));
}
if(values.size() > 0) {
@@ -1099,7 +1091,6 @@
}
// No data for this row, go get another.
-
results.clear();
}
return values;
@@ -1110,26 +1101,46 @@
}
}
- /** {@inheritDoc} */
public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
- throws IOException {
-
+ throws IOException {
requestCount.incrementAndGet();
+ // If timestamp == LATEST_TIMESTAMP and we have deletes, then they need
+ // special treatment. For these we need to first find the latest cell so
+ // when we write the delete, we write it with the latest cells' timestamp
+ // so the delete record overshadows. This means deletes and puts do not
+ // happen within the same row lock.
+ List<Text> deletes = null;
try {
long lockid = startUpdate(regionName, b.getRow());
for(BatchOperation op: b) {
switch(op.getOp()) {
- case BatchOperation.PUT_OP:
+ case PUT:
put(regionName, lockid, op.getColumn(), op.getValue());
break;
- case BatchOperation.DELETE_OP:
- delete(regionName, lockid, op.getColumn());
+ case DELETE:
+ if (timestamp == LATEST_TIMESTAMP) {
+ // Save off these deletes.
+ if (deletes == null) {
+ deletes = new ArrayList<Text>();
+ }
+ deletes.add(op.getColumn());
+ } else {
+ delete(regionName, lockid, op.getColumn());
+ }
break;
}
}
- commit(regionName, lockid, timestamp);
+ commit(regionName, lockid,
+ (timestamp == LATEST_TIMESTAMP)? System.currentTimeMillis(): timestamp);
+ if (deletes != null && deletes.size() > 0) {
+ // We have some LATEST_TIMESTAMP deletes to run.
+ HRegion r = getRegion(regionName);
+ for (Text column: deletes) {
+ r.deleteMultiple(b.getRow(), column, LATEST_TIMESTAMP, 1);
+ }
+ }
} catch (IOException e) {
checkFileSystem();
throw e;
@@ -1158,7 +1169,6 @@
}
leases.createLease(scannerId, scannerId, new ScannerListener(scannerName));
return scannerId;
-
} catch (IOException e) {
if (e instanceof RemoteException) {
try {
@@ -1217,7 +1227,11 @@
s = scanners.remove(this.scannerName);
}
if (s != null) {
- s.close();
+ try {
+ s.close();
+ } catch (IOException e) {
+ LOG.error("Closing scanner", e);
+ }
}
}
}
@@ -1241,9 +1255,15 @@
protected void delete(Text regionName, long lockid, Text column)
throws IOException {
-
HRegion region = getRegion(regionName);
region.delete(lockid, column);
+ }
+
+ public void deleteAll(final Text regionName, final Text row,
+ final Text column, final long timestamp)
+ throws IOException {
+ HRegion region = getRegion(regionName);
+ region.deleteAll(row, column, timestamp);
}
protected void commit(Text regionName, final long lockid,
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HScannerInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HScannerInterface.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HScannerInterface.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HScannerInterface.java Mon Sep 10 08:56:16 2007
@@ -30,9 +30,12 @@
*/
public interface HScannerInterface {
/**
- * Get the next set of values
+ * Grab the next row's worth of values. The scanner will return the most
+ * recent data value for each row that is not newer than the target time
+ * passed when the scanner was created.
* @param key will contain the row and timestamp upon return
- * @param results will contain an entry for each column family member and its value
+ * @param results will contain an entry for each column family member and its
+ * value
* @return true if data was returned
* @throws IOException
*/
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Mon Sep 10 08:56:16 2007
@@ -24,6 +24,9 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -86,7 +89,14 @@
final HLocking lock = new HLocking();
+ /* Sorted Map of readers keyed by sequence id (Most recent should be last in
+ * in list).
+ */
TreeMap<Long, HStoreFile> storefiles = new TreeMap<Long, HStoreFile>();
+
+ /* Sorted Map of readers keyed by sequence id (Most recent should be last in
+ * in list).
+ */
TreeMap<Long, MapFile.Reader> readers = new TreeMap<Long, MapFile.Reader>();
Random rand = new Random();
@@ -176,7 +186,7 @@
// MapFiles are in a reliable state. Every entry in 'mapdir' must have a
// corresponding one in 'loginfodir'. Without a corresponding log info
// file, the entry in 'mapdir' must be deleted.
- Vector<HStoreFile> hstoreFiles
+ Collection<HStoreFile> hstoreFiles
= HStoreFile.loadHStoreFiles(conf, dir, regionName, familyName, fs);
for(HStoreFile hsf: hstoreFiles) {
this.storefiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf);
@@ -446,30 +456,23 @@
MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression,
this.bloomFilter);
- // hbase.hstore.compact.on.flush=true enables picking up an existing
- // HStoreFIle from disk interlacing the memcache flush compacting as we
- // go. The notion is that interlacing would take as long as a pure
- // flush with the added benefit of having one less file in the store.
- // Experiments show that it takes two to three times the amount of time
- // flushing -- more column families makes it so the two timings come
- // closer together -- but it also complicates the flush. Disabled for
- // now. Needs work picking which file to interlace (favor references
- // first, etc.)
+ // Here we tried picking up an existing HStoreFile from disk and
+ // interlacing the memcache flush compacting as we go. The notion was
+ // that interlacing would take as long as a pure flush with the added
+ // benefit of having one less file in the store. Experiments showed that
+ // it takes two to three times the amount of time flushing -- more column
+ // families makes it so the two timings come closer together -- but it
+ // also complicates the flush. The code was removed. Needed work picking
+ // which file to interlace (favor references first, etc.)
//
// Related, looks like 'merging compactions' in BigTable paper interlaces
// a memcache flush. We don't.
try {
- if (this.conf.getBoolean("hbase.hstore.compact.on.flush", false) &&
- this.storefiles.size() > 0) {
- compact(out, inputCache.entrySet().iterator(),
- this.readers.get(this.storefiles.firstKey()));
- } else {
- for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
- HStoreKey curkey = es.getKey();
- if (this.familyName.
- equals(HStoreKey.extractFamily(curkey.getColumn()))) {
- out.append(curkey, new ImmutableBytesWritable(es.getValue()));
- }
+ for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
+ HStoreKey curkey = es.getKey();
+ if (this.familyName.
+ equals(HStoreKey.extractFamily(curkey.getColumn()))) {
+ out.append(curkey, new ImmutableBytesWritable(es.getValue()));
}
}
} finally {
@@ -546,7 +549,6 @@
*
* We don't want to hold the structureLock for the whole time, as a compact()
* can be lengthy and we want to allow cache-flushes during this period.
- *
* @throws IOException
*/
void compact() throws IOException {
@@ -564,6 +566,8 @@
* @param maxSeenSeqID We may have already calculated the maxSeenSeqID. If
* so, pass it here. Otherwise, pass -1 and it will be calculated inside in
* this method.
+ * @param deleteSequenceInfo
+ * @param maxSeenSeqID
* @throws IOException
*/
void compactHelper(final boolean deleteSequenceInfo, long maxSeenSeqID)
@@ -584,7 +588,7 @@
}
}
try {
- Vector<HStoreFile> toCompactFiles = getFilesToCompact();
+ List<HStoreFile> toCompactFiles = getFilesToCompact();
HStoreFile compactedOutputFile =
new HStoreFile(conf, this.compactionDir, regionName, familyName, -1);
if (toCompactFiles.size() < 1 ||
@@ -664,17 +668,21 @@
}
/*
- * @return list of files to compact
+ * @return list of files to compact sorted so most recent comes first.
*/
- private Vector<HStoreFile> getFilesToCompact() {
- Vector<HStoreFile> toCompactFiles = null;
+ private List<HStoreFile> getFilesToCompact() {
+ List<HStoreFile> filesToCompact = null;
this.lock.obtainWriteLock();
try {
- toCompactFiles = new Vector<HStoreFile>(storefiles.values());
+ // Storefiles are keyed by sequence id. The oldest file comes first.
+ // We need to return out of here a List that has the newest file as
+ // first.
+ filesToCompact = new ArrayList<HStoreFile>(this.storefiles.values());
+ Collections.reverse(filesToCompact);
} finally {
this.lock.releaseWriteLock();
}
- return toCompactFiles;
+ return filesToCompact;
}
/*
@@ -694,7 +702,7 @@
* @throws IOException
*/
void compact(final MapFile.Writer compactedOut,
- final Vector<HStoreFile> toCompactFiles)
+ final List<HStoreFile> toCompactFiles)
throws IOException {
int size = toCompactFiles.size();
CompactionReader[] rdrs = new CompactionReader[size];
@@ -842,8 +850,14 @@
int timesSeen = 0;
Text lastRow = new Text();
Text lastColumn = new Text();
- while(numDone < done.length) {
- // Find the reader with the smallest key
+ // Map of a row deletes keyed by column with a list of timestamps for value
+ Map<Text, List<Long>> deletes = null;
+ while (numDone < done.length) {
+ // Find the reader with the smallest key. If two files have same key
+ // but different values -- i.e. one is delete and other is non-delete
+ // value -- we will find the first, the one that was written later and
+ // therefore the one whose value should make it out to the compacted
+ // store file.
int smallestKey = -1;
for(int i = 0; i < rdrs.length; i++) {
if(done[i]) {
@@ -865,24 +879,23 @@
timesSeen++;
} else {
timesSeen = 1;
+ // We are on to a new row. Create a new deletes list.
+ deletes = new HashMap<Text, List<Long>>();
}
- if(timesSeen <= family.getMaxVersions()) {
+ byte [] value = (vals[smallestKey] == null)?
+ null: vals[smallestKey].get();
+ if (!isDeleted(sk, value, null, deletes) &&
+ timesSeen <= family.getMaxVersions()) {
// Keep old versions until we have maxVersions worth.
// Then just skip them.
- if(sk.getRow().getLength() != 0
- && sk.getColumn().getLength() != 0) {
+ if (sk.getRow().getLength() != 0 && sk.getColumn().getLength() != 0) {
// Only write out objects which have a non-zero length key and
// value
compactedOut.append(sk, vals[smallestKey]);
}
}
- // TODO: I don't know what to do about deleted values. I currently
- // include the fact that the item was deleted as a legitimate
- // "version" of the data. Maybe it should just drop the deleted
- // val?
-
// Update last-seen items
lastRow.set(sk.getRow());
lastColumn.set(sk.getColumn());
@@ -900,6 +913,52 @@
}
/*
+ * Check if this is cell is deleted.
+ * If a memcache and a deletes, check key does not have an entry filled.
+ * Otherwise, check value is not the <code>HGlobals.deleteBytes</code> value.
+ * If passed value IS deleteBytes, then it is added to the passed
+ * deletes map.
+ * @param hsk
+ * @param value
+ * @param memcache Can be null.
+ * @param deletes Map keyed by column with a value of timestamp. Can be null.
+ * If non-null and passed value is HGlobals.deleteBytes, then we add to this
+ * map.
+ * @return True if this is a deleted cell. Adds the passed deletes map if
+ * passed value is HGlobals.deleteBytes.
+ */
+ private boolean isDeleted(final HStoreKey hsk, final byte [] value,
+ final HMemcache memcache, final Map<Text, List<Long>> deletes) {
+ if (memcache != null && memcache.isDeleted(hsk)) {
+ return true;
+ }
+ List<Long> timestamps = (deletes == null)?
+ null: deletes.get(hsk.getColumn());
+ if (timestamps != null &&
+ timestamps.contains(Long.valueOf(hsk.getTimestamp()))) {
+ return true;
+ }
+ if (value == null) {
+ // If a null value, shouldn't be in here. Mark it as deleted cell.
+ return true;
+ }
+ if (!HGlobals.deleteBytes.equals(value)) {
+ return false;
+ }
+ // Cell has delete value. Save it into deletes.
+ if (deletes != null) {
+ if (timestamps == null) {
+ timestamps = new ArrayList<Long>();
+ deletes.put(hsk.getColumn(), timestamps);
+ }
+ // We know its not already in the deletes array else we'd have returned
+ // earlier so no need to test if timestamps already has this value.
+ timestamps.add(Long.valueOf(hsk.getTimestamp()));
+ }
+ return true;
+ }
+
+ /*
* It's assumed that the compactLock will be acquired prior to calling this
* method! Otherwise, it is not thread-safe!
*
@@ -1061,22 +1120,37 @@
* previous 'numVersions-1' values, as well.
*
* If 'numVersions' is negative, the method returns all available versions.
+ * @param key
+ * @param numVersions Number of versions to fetch. Must be > 0.
+ * @param memcache Checked for deletions
+ * @return
+ * @throws IOException
*/
- byte [][] get(HStoreKey key, int numVersions) throws IOException {
+ byte [][] get(HStoreKey key, int numVersions, final HMemcache memcache)
+ throws IOException {
if (numVersions <= 0) {
throw new IllegalArgumentException("Number of versions must be > 0");
}
List<byte []> results = new ArrayList<byte []>();
+ // Keep a list of deleted cell keys. We need this because as we go through
+ // the store files, the cell with the delete marker may be in one file and
+ // the old non-delete cell value in a later store file. If we don't keep
+ // around the fact that the cell was deleted in a newer record, we end up
+ // returning the old value if user is asking for more than one version.
+ // This List of deletes should not large since we are only keeping rows
+ // and columns that match those set on the scanner and which have delete
+ // values. If memory usage becomes an issue, could redo as bloom filter.
+ Map<Text, List<Long>> deletes = new HashMap<Text, List<Long>>();
+ // This code below is very close to the body of the getKeys method.
this.lock.obtainReadLock();
try {
MapFile.Reader[] maparray = getReaders();
for(int i = maparray.length - 1; i >= 0; i--) {
MapFile.Reader map = maparray[i];
-
synchronized(map) {
- ImmutableBytesWritable readval = new ImmutableBytesWritable();
map.reset();
+ ImmutableBytesWritable readval = new ImmutableBytesWritable();
HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
if (readkey == null) {
// map.getClosest returns null if the passed key is > than the
@@ -1085,29 +1159,102 @@
// BEFORE.
continue;
}
- if (readkey.matchesRowCol(key)) {
- if(readval.equals(HGlobals.deleteBytes)) {
+ if (!readkey.matchesRowCol(key)) {
+ continue;
+ }
+ if (!isDeleted(readkey, readval.get(), memcache, deletes)) {
+ results.add(readval.get());
+ // Perhaps only one version is wanted. I could let this
+ // test happen later in the for loop test but it would cost
+ // the allocation of an ImmutableBytesWritable.
+ if (hasEnoughVersions(numVersions, results)) {
break;
}
- results.add(readval.get());
- readval = new ImmutableBytesWritable();
- while(map.next(readkey, readval) && readkey.matchesRowCol(key)) {
- if ((numVersions > 0 && (results.size() >= numVersions))
- || readval.equals(HGlobals.deleteBytes)) {
- break;
- }
+ }
+ while ((readval = new ImmutableBytesWritable()) != null &&
+ map.next(readkey, readval) &&
+ readkey.matchesRowCol(key) &&
+ !hasEnoughVersions(numVersions, results)) {
+ if (!isDeleted(readkey, readval.get(), memcache, deletes)) {
results.add(readval.get());
- readval = new ImmutableBytesWritable();
}
}
}
- if(results.size() >= numVersions) {
+ if (hasEnoughVersions(numVersions, results)) {
break;
}
}
-
return results.size() == 0 ?
null : ImmutableBytesWritable.toArray(results);
+ } finally {
+ this.lock.releaseReadLock();
+ }
+ }
+
+ private boolean hasEnoughVersions(final int numVersions,
+ final List<byte []> results) {
+ return numVersions > 0 && results.size() >= numVersions;
+ }
+
+ /**
+ * Get <code>versions</code> keys matching the origin key's
+ * row/column/timestamp and those of an older vintage
+ * Default access so can be accessed out of {@link HRegionServer}.
+ * @param origin Where to start searching.
+ * @param versions How many versions to return. Pass
+ * {@link HConstants.ALL_VERSIONS} to retrieve all. Versions will include
+ * size of passed <code>allKeys</code> in its count.
+ * @param allKeys List of keys prepopulated by keys we found in memcache.
+ * This method returns this passed list with all matching keys found in
+ * stores appended.
+ * @return The passed <code>allKeys</code> with <code>versions</code> of
+ * matching keys found in store files appended.
+ * @throws IOException
+ */
+ List<HStoreKey> getKeys(final HStoreKey origin, List<HStoreKey> allKeys,
+ final int versions)
+ throws IOException {
+ if (allKeys == null) {
+ allKeys = new ArrayList<HStoreKey>();
+ }
+ // This code below is very close to the body of the get method.
+ this.lock.obtainReadLock();
+ try {
+ MapFile.Reader[] maparray = getReaders();
+ for(int i = maparray.length - 1; i >= 0; i--) {
+ MapFile.Reader map = maparray[i];
+ synchronized(map) {
+ map.reset();
+ ImmutableBytesWritable readval = new ImmutableBytesWritable();
+ HStoreKey readkey = (HStoreKey)map.getClosest(origin, readval);
+ if (readkey == null) {
+ // map.getClosest returns null if the passed key is > than the
+ // last key in the map file. getClosest is a bit of a misnomer
+ // since it returns exact match or the next closest key AFTER not
+ // BEFORE.
+ continue;
+ }
+ if (!readkey.matchesRowCol(origin)) {
+ continue;
+ }
+ if (!isDeleted(readkey, readval.get(), null, null) &&
+ !allKeys.contains(readkey)) {
+ allKeys.add(new HStoreKey(readkey));
+ }
+ while ((readval = new ImmutableBytesWritable()) != null &&
+ map.next(readkey, readval) &&
+ readkey.matchesRowCol(origin)) {
+ if (!isDeleted(readkey, readval.get(), null, null) &&
+ !allKeys.contains(readkey)) {
+ allKeys.add(new HStoreKey(readkey));
+ if (versions != ALL_VERSIONS && allKeys.size() >= versions) {
+ break;
+ }
+ }
+ }
+ }
+ }
+ return allKeys;
} finally {
this.lock.releaseReadLock();
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java Mon Sep 10 08:56:16 2007
@@ -35,7 +35,6 @@
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -531,10 +530,11 @@
}
/**
- * Delete the value for a column
- *
- * @param lockid - lock id returned from startUpdate
- * @param column - name of column whose value is to be deleted
+ * Delete the value for a column.
+ * Deletes the cell whose row/column/commit-timestamp match those of the
+ * delete.
+ * @param lockid lock id returned from startUpdate
+ * @param column name of column whose value is to be deleted
*/
public void delete(long lockid, Text column) {
checkClosed();
@@ -543,9 +543,59 @@
}
/**
+ * Delete all values for a column
+ *
+ * @param row Row to update
+ * @param column name of column whose value is to be deleted
+ * @throws IOException
+ */
+ public void deleteAll(final Text row, final Text column) throws IOException {
+ deleteAll(row, column, LATEST_TIMESTAMP);
+ }
+
+ /**
+ * Delete all values for a column
+ *
+ * @param row Row to update
+ * @param column name of column whose value is to be deleted
+ * @param ts Delete all cells of the same timestamp or older.
+ * @throws IOException
+ */
+ public void deleteAll(final Text row, final Text column, final long ts)
+ throws IOException {
+ checkClosed();
+ for(int tries = 0; tries < numRetries; tries++) {
+ HRegionLocation r = getRegionLocation(row);
+ HRegionInterface server =
+ connection.getHRegionConnection(r.getServerAddress());
+ try {
+ server.deleteAll(r.getRegionInfo().getRegionName(), row, column, ts);
+ break;
+
+ } catch (IOException e) {
+ if (e instanceof RemoteException) {
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ }
+ if (tries == numRetries - 1) {
+ throw e;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("reloading table servers because: " + e.getMessage());
+ }
+ tableServers = connection.reloadTableServers(tableName);
+ }
+ try {
+ Thread.sleep(this.pause);
+ } catch (InterruptedException x) {
+ // continue
+ }
+ }
+ }
+
+ /**
* Abort a row mutation
*
- * @param lockid - lock id returned from startUpdate
+ * @param lockid lock id returned from startUpdate
*/
public synchronized void abort(long lockid) {
checkClosed();
@@ -558,24 +608,26 @@
/**
* Finalize a row mutation
- *
- * @param lockid - lock id returned from startUpdate
+ * When this method is specified, we pass the server a value that says use
+ * the 'latest' timestamp. If we are doing a put, on the server-side, cells
+ * will be given the servers's current timestamp. If the we are commiting
+ * deletes, then delete removes the most recently modified cell of stipulated
+ * column.
+ * @param lockid lock id returned from startUpdate
* @throws IOException
*/
public void commit(long lockid) throws IOException {
- commit(lockid, System.currentTimeMillis());
+ commit(lockid, LATEST_TIMESTAMP);
}
/**
* Finalize a row mutation
- *
- * @param lockid - lock id returned from startUpdate
- * @param timestamp - time to associate with the change
+ * @param lockid lock id returned from startUpdate
+ * @param timestamp time to associate with the change
* @throws IOException
*/
public synchronized void commit(long lockid, long timestamp)
throws IOException {
-
checkClosed();
updateInProgress(true);
if (batch.get().getLockid() != lockid) {
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java Mon Sep 10 08:56:16 2007
@@ -27,47 +27,53 @@
import org.apache.hadoop.io.Writable;
/**
- * batch update operation
+ * Batch update operations such as put, delete, and deleteAll.
*/
public class BatchOperation implements Writable {
- /** put operation */
- public static final int PUT_OP = 1;
-
- /** delete operation */
- public static final int DELETE_OP = 2;
-
- private int op;
+ /**
+ * Operation types.
+ * @see org.apache.hadoop.io.SequenceFile.Writer
+ */
+ public static enum Operation {PUT, DELETE}
+
+ private Operation op;
private Text column;
private byte[] value;
/** default constructor used by Writable */
public BatchOperation() {
- this.op = 0;
- this.column = new Text();
- this.value = null;
+ this(new Text());
}
-
/**
- * Creates a put operation
+ * Creates a DELETE operation
+ *
+ * @param column column name
+ */
+ public BatchOperation(final Text column) {
+ this(Operation.DELETE, column, null);
+ }
+
+ /**
+ * Creates a PUT operation
*
* @param column column name
* @param value column value
*/
- public BatchOperation(Text column, byte[] value) {
- this.op = PUT_OP;
- this.column = column;
- this.value = value;
+ public BatchOperation(final Text column, final byte [] value) {
+ this(Operation.PUT, column, value);
}
/**
- * Creates a delete operation
+ * Creates a put operation
*
- * @param column name of column to delete
+ * @param column column name
+ * @param value column value
*/
- public BatchOperation(Text column) {
- this.op = DELETE_OP;
+ public BatchOperation(final Operation operation, final Text column,
+ final byte[] value) {
+ this.op = operation;
this.column = column;
- this.value = null;
+ this.value = value;
}
/**
@@ -80,8 +86,8 @@
/**
* @return the operation
*/
- public int getOp() {
- return op;
+ public Operation getOp() {
+ return this.op;
}
/**
@@ -99,9 +105,10 @@
* {@inheritDoc}
*/
public void readFields(DataInput in) throws IOException {
- op = in.readInt();
+ int ordinal = in.readInt();
+ this.op = Operation.values()[ordinal];
column.readFields(in);
- if(op == PUT_OP) {
+ if (this.op == Operation.PUT) {
value = new byte[in.readInt()];
in.readFully(value);
}
@@ -111,11 +118,11 @@
* {@inheritDoc}
*/
public void write(DataOutput out) throws IOException {
- out.writeInt(op);
+ out.writeInt(this.op.ordinal());
column.write(out);
- if(op == PUT_OP) {
+ if (this.op == Operation.PUT) {
out.writeInt(value.length);
out.write(value);
}
}
-}
+}
\ No newline at end of file
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java Mon Sep 10 08:56:16 2007
@@ -61,7 +61,7 @@
*/
public BatchUpdate(long lockid) {
this.row = new Text();
- this.lockid = Long.valueOf(Math.abs(lockid));
+ this.lockid = Math.abs(lockid);
this.operations = new ArrayList<BatchOperation>();
}
@@ -97,27 +97,28 @@
/**
* Change a value for the specified column
*
- * @param lockid - lock id returned from startUpdate
- * @param column - column whose value is being set
- * @param val - new value for column
+ * @param lid lock id returned from startUpdate
+ * @param column column whose value is being set
+ * @param val new value for column
*/
- public synchronized void put(final long lockid, final Text column,
+ public synchronized void put(final long lid, final Text column,
final byte val[]) {
- if(this.lockid != lockid) {
- throw new IllegalArgumentException("invalid lockid " + lockid);
+ if(this.lockid != lid) {
+ throw new IllegalArgumentException("invalid lockid " + lid);
}
operations.add(new BatchOperation(column, val));
}
/**
* Delete the value for a column
- *
- * @param lockid - lock id returned from startUpdate
- * @param column - name of column whose value is to be deleted
+ * Deletes the cell whose row/column/commit-timestamp match those of the
+ * delete.
+ * @param lid lock id returned from startUpdate
+ * @param column name of column whose value is to be deleted
*/
- public synchronized void delete(final long lockid, final Text column) {
- if(this.lockid != lockid) {
- throw new IllegalArgumentException("invalid lockid " + lockid);
+ public synchronized void delete(final long lid, final Text column) {
+ if(this.lockid != lid) {
+ throw new IllegalArgumentException("invalid lockid " + lid);
}
operations.add(new BatchOperation(column));
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/ImmutableBytesWritable.java Mon Sep 10 08:56:16 2007
@@ -153,6 +153,9 @@
/** {@inheritDoc} */
@Override
public boolean equals(Object right_obj) {
+ if (right_obj instanceof byte []) {
+ return compareTo((byte [])right_obj) == 0;
+ }
if (right_obj instanceof ImmutableBytesWritable) {
return compareTo(right_obj) == 0;
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java Mon Sep 10 08:56:16 2007
@@ -26,6 +26,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor.CompressionType;
import org.apache.hadoop.io.Text;
/**
@@ -41,6 +42,7 @@
protected static final char LAST_CHAR = 'z';
protected static final byte [] START_KEY_BYTES =
{FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
+ protected static final int MAXVERSIONS = 3;
static {
StaticTestEnvironment.initialize();
@@ -100,10 +102,18 @@
}
protected HTableDescriptor createTableDescriptor(final String name) {
+ return createTableDescriptor(name, MAXVERSIONS);
+ }
+
+ protected HTableDescriptor createTableDescriptor(final String name,
+ final int versions) {
HTableDescriptor htd = new HTableDescriptor(name);
- htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME1));
- htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME2));
- htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME3));
+ htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME1), versions,
+ CompressionType.NONE, false, Integer.MAX_VALUE, null));
+ htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME2), versions,
+ CompressionType.NONE, false, Integer.MAX_VALUE, null));
+ htd.addFamily(new HColumnDescriptor(new Text(COLFAMILY_NAME3), versions,
+ CompressionType.NONE, false, Integer.MAX_VALUE, null));
return htd;
}
@@ -123,18 +133,18 @@
if (startKeyBytes == null || startKeyBytes.length == 0) {
startKeyBytes = START_KEY_BYTES;
}
- addContent(new HRegionLoader(r), column, startKeyBytes, endKey, -1);
+ addContent(new HRegionIncommon(r), column, startKeyBytes, endKey, -1);
}
/**
* Add content to region <code>r</code> on the passed column
* <code>column</code>.
* Adds data of the from 'aaa', 'aab', etc where key and value are the same.
- * @param updater An instance of {@link Loader}.
+ * @param updater An instance of {@link Incommon}.
* @param column
* @throws IOException
*/
- protected static void addContent(final Loader updater, final String column)
+ protected static void addContent(final Incommon updater, final String column)
throws IOException {
addContent(updater, column, START_KEY_BYTES, null);
}
@@ -143,13 +153,13 @@
* Add content to region <code>r</code> on the passed column
* <code>column</code>.
* Adds data of the from 'aaa', 'aab', etc where key and value are the same.
- * @param updater An instance of {@link Loader}.
+ * @param updater An instance of {@link Incommon}.
* @param column
* @param startKeyBytes Where to start the rows inserted
* @param endKey Where to stop inserting rows.
* @throws IOException
*/
- protected static void addContent(final Loader updater, final String column,
+ protected static void addContent(final Incommon updater, final String column,
final byte [] startKeyBytes, final Text endKey)
throws IOException {
addContent(updater, column, startKeyBytes, endKey, -1);
@@ -159,14 +169,14 @@
* Add content to region <code>r</code> on the passed column
* <code>column</code>.
* Adds data of the from 'aaa', 'aab', etc where key and value are the same.
- * @param updater An instance of {@link Loader}.
+ * @param updater An instance of {@link Incommon}.
* @param column
* @param startKeyBytes Where to start the rows inserted
* @param endKey Where to stop inserting rows.
* @param ts Timestamp to write the content with.
* @throws IOException
*/
- protected static void addContent(final Loader updater, final String column,
+ protected static void addContent(final Incommon updater, final String column,
final byte [] startKeyBytes, final Text endKey, final long ts)
throws IOException {
// Add rows of three characters. The first character starts with the
@@ -207,23 +217,42 @@
}
/**
- * Interface used by the addContent methods so either a HTable or a HRegion
- * can be passed to the methods.
+ * Implementors can flushcache.
+ */
+ public static interface FlushCache {
+ public void flushcache() throws IOException;
+ }
+
+ /**
+ * Interface used by tests so can do common operations against an HTable
+ * or an HRegion.
+ *
+ * TOOD: Come up w/ a better name for this interface.
*/
- public static interface Loader {
+ public static interface Incommon {
+ public byte [] get(Text row, Text column) throws IOException;
+ public byte [][] get(Text row, Text column, int versions)
+ throws IOException;
+ public byte [][] get(Text row, Text column, long ts, int versions)
+ throws IOException;
public long startBatchUpdate(final Text row) throws IOException;
public void put(long lockid, Text column, byte val[]) throws IOException;
+ public void delete(long lockid, Text column) throws IOException;
+ public void deleteAll(Text row, Text column, long ts) throws IOException;
public void commit(long lockid) throws IOException;
public void commit(long lockid, long ts) throws IOException;
public void abort(long lockid) throws IOException;
+ public HScannerInterface getScanner(Text [] columns, Text firstRow,
+ long ts)
+ throws IOException;
}
/**
- * A class that makes a {@link Loader} out of a {@link HRegion}
+ * A class that makes a {@link Incommon} out of a {@link HRegion}
*/
- public static class HRegionLoader implements Loader {
+ public static class HRegionIncommon implements Incommon {
final HRegion region;
- public HRegionLoader(final HRegion HRegion) {
+ public HRegionIncommon(final HRegion HRegion) {
super();
this.region = HRegion;
}
@@ -231,7 +260,7 @@
this.region.abort(lockid);
}
public void commit(long lockid) throws IOException {
- this.region.commit(lockid, System.currentTimeMillis());
+ this.region.commit(lockid);
}
public void commit(long lockid, final long ts) throws IOException {
this.region.commit(lockid, ts);
@@ -239,17 +268,38 @@
public void put(long lockid, Text column, byte[] val) throws IOException {
this.region.put(lockid, column, val);
}
+ public void delete(long lockid, Text column) throws IOException {
+ this.region.delete(lockid, column);
+ }
+ public void deleteAll(Text row, Text column, long ts) throws IOException {
+ this.region.deleteAll(row, column, ts);
+ }
public long startBatchUpdate(Text row) throws IOException {
return this.region.startUpdate(row);
}
+ public HScannerInterface getScanner(Text [] columns, Text firstRow,
+ long ts)
+ throws IOException {
+ return this.region.getScanner(columns, firstRow, ts, null);
+ }
+ public byte[] get(Text row, Text column) throws IOException {
+ return this.region.get(row, column);
+ }
+ public byte[][] get(Text row, Text column, int versions) throws IOException {
+ return this.region.get(row, column, versions);
+ }
+ public byte[][] get(Text row, Text column, long ts, int versions)
+ throws IOException {
+ return this.region.get(row, column, ts, versions);
+ }
}
/**
- * A class that makes a {@link Loader} out of a {@link HTable}
+ * A class that makes a {@link Incommon} out of a {@link HTable}
*/
- public static class HTableLoader implements Loader {
+ public static class HTableIncommon implements Incommon {
final HTable table;
- public HTableLoader(final HTable table) {
+ public HTableIncommon(final HTable table) {
super();
this.table = table;
}
@@ -265,8 +315,30 @@
public void put(long lockid, Text column, byte[] val) throws IOException {
this.table.put(lockid, column, val);
}
+ public void delete(long lockid, Text column) throws IOException {
+ this.table.delete(lockid, column);
+ }
+ public void deleteAll(Text row, Text column, long ts) throws IOException {
+ this.table.deleteAll(row, column, ts);
+ }
public long startBatchUpdate(Text row) {
return this.table.startUpdate(row);
+ }
+ public HScannerInterface getScanner(Text [] columns, Text firstRow,
+ long ts)
+ throws IOException {
+ return this.table.obtainScanner(columns, firstRow, ts, null);
+ }
+ public byte[] get(Text row, Text column) throws IOException {
+ return this.table.get(row, column);
+ }
+ public byte[][] get(Text row, Text column, int versions)
+ throws IOException {
+ return this.table.get(row, column, versions);
+ }
+ public byte[][] get(Text row, Text column, long ts, int versions)
+ throws IOException {
+ return this.table.get(row, column, ts, versions);
}
}
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java Mon Sep 10 08:56:16 2007
@@ -417,4 +417,15 @@
}
f.delete();
}
+
+ /**
+ * Call flushCache on all regions on all participating regionservers.
+ * @throws IOException
+ */
+ void flushcache() throws IOException {
+ HRegionServer s = this.regionThreads.get(0).getRegionServer();
+ for(HRegion r: s.onlineRegions.values() ) {
+ r.flushcache(false);
+ }
+ }
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java Mon Sep 10 08:56:16 2007
@@ -54,7 +54,7 @@
HTable meta = new HTable(conf, HConstants.META_TABLE_NAME);
int count = count(meta, HConstants.COLUMN_FAMILY_STR);
HTable t = new HTable(conf, new Text(tableName));
- addContent(new HTableLoader(t), columnName);
+ addContent(new HTableIncommon(t), columnName);
// All is running in the one JVM so I should be able to get the single
// region instance and bring on a split.
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCompaction.java Mon Sep 10 08:56:16 2007
@@ -23,71 +23,133 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
/**
* Test compactions
*/
public class TestCompaction extends HBaseTestCase {
static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
-
+ private HLog hlog = null;
+ private HRegion r = null;
+ private static final String COLUMN_FAMILY = COLFAMILY_NAME1;
+ private static final Text STARTROW = new Text(START_KEY_BYTES);
+ private static final Text COLUMN_FAMILY_TEXT = new Text(COLUMN_FAMILY);
+ private static final Text COLUMN_FAMILY_TEXT_MINUS_COLON =
+ new Text(COLUMN_FAMILY.substring(0, COLUMN_FAMILY.length() - 1));
+ private static final int COMPACTION_THRESHOLD = MAXVERSIONS;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ this.hlog = new HLog(this.localFs, this.testDir, this.conf);
+ HTableDescriptor htd = createTableDescriptor(getName());
+ HRegionInfo hri = new HRegionInfo(1, htd, null, null);
+ this.r = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null);
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ this.r.close();
+ this.hlog.closeAndDelete();
+ super.tearDown();
+ }
+
/**
* Run compaction and flushing memcache
+ * Assert deletes get cleaned up.
* @throws Exception
*/
public void testCompaction() throws Exception {
- HLog hlog = new HLog(this.localFs, this.testDir, this.conf);
- HTableDescriptor htd = createTableDescriptor(getName());
- HRegionInfo hri = new HRegionInfo(1, htd, null, null);
- final HRegion r =
- new HRegion(testDir, hlog, this.localFs, this.conf, hri, null);
- try {
+ createStoreFile(r);
+ assertFalse(r.needsCompaction());
+ for (int i = 0; i < COMPACTION_THRESHOLD; i++) {
createStoreFile(r);
- assertFalse(r.needsCompaction());
- int compactionThreshold =
- this.conf.getInt("hbase.hstore.compactionThreshold", 3);
- for (int i = 0; i < compactionThreshold; i++) {
- createStoreFile(r);
- }
- assertTrue(r.needsCompaction());
- // Try to run compaction concurrent with a thread flush.
- addContent(new HRegionLoader(r), COLFAMILY_NAME1);
- Thread t1 = new Thread() {
- @Override
- public void run() {
- try {
- r.flushcache(false);
- } catch (IOException e) {
- e.printStackTrace();
- }
+ }
+ assertTrue(r.needsCompaction());
+ // Add more content. Now there are about 5 versions of each column.
+ // Default is that there only 3 (MAXVERSIONS) versions allowed per column.
+ // Assert > 3 and then after compaction, assert that only 3 versions
+ // available.
+ addContent(new HRegionIncommon(r), COLUMN_FAMILY);
+ byte [][] bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/);
+ // Assert that I can get > 5 versions (Should be at least 5 in there).
+ assertTrue(bytes.length >= 5);
+ // Try to run compaction concurrent with a thread flush just to see that
+ // we can.
+ final HRegion region = this.r;
+ Thread t1 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ region.flushcache(false);
+ } catch (IOException e) {
+ e.printStackTrace();
}
- };
- Thread t2 = new Thread() {
- @Override
- public void run() {
- try {
- assertTrue(r.compactStores());
- } catch (IOException e) {
- e.printStackTrace();
- }
+ }
+ };
+ Thread t2 = new Thread() {
+ @Override
+ public void run() {
+ try {
+ assertTrue(region.compactStores());
+ } catch (IOException e) {
+ e.printStackTrace();
}
- };
- t1.setDaemon(true);
- t1.start();
- t2.setDaemon(true);
- t2.start();
- t1.join();
- t2.join();
- } finally {
- r.close();
- hlog.closeAndDelete();
+ }
+ };
+ t1.setDaemon(true);
+ t1.start();
+ t2.setDaemon(true);
+ t2.start();
+ t1.join();
+ t2.join();
+ // Now assert that there are 4 versions of a record only: thats the
+ // 3 versions that should be in the compacted store and then the one more
+ // we added when we compacted.
+ byte [] secondRowBytes = new byte[START_KEY_BYTES.length];
+ System.arraycopy(START_KEY_BYTES, 0, secondRowBytes, 0,
+ START_KEY_BYTES.length);
+ // Increment the least significant character so we get to next row.
+ secondRowBytes[START_KEY_BYTES.length - 1]++;
+ Text secondRow = new Text(secondRowBytes);
+ bytes = this.r.get(secondRow, COLUMN_FAMILY_TEXT, 100/*Too many*/);
+ assertTrue(bytes.length == 4);
+ // Now add deletes to memcache and then flush it. That will put us over
+ // the compaction threshold of 3 store files. Compacting these store files
+ // should result in a compacted store file that has no references to the
+ // deleted row.
+ this.r.deleteAll(STARTROW, COLUMN_FAMILY_TEXT, System.currentTimeMillis());
+ // Now, before compacting, remove all instances of the first row so can
+ // verify that it is removed as we compact.
+ // Assert all delted.
+ assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/));
+ this.r.flushcache(false);
+ assertNull(this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/));
+ assertTrue(this.r.needsCompaction());
+ this.r.compactStores();
+ // Assert that the first row is still deleted.
+ bytes = this.r.get(STARTROW, COLUMN_FAMILY_TEXT, 100 /*Too many*/);
+ assertNull(bytes);
+ // Assert the store files do not have the first record 'aaa' keys in them.
+ for (MapFile.Reader reader:
+ this.r.stores.get(COLUMN_FAMILY_TEXT_MINUS_COLON).readers.values()) {
+ reader.reset();
+ HStoreKey key = new HStoreKey();
+ ImmutableBytesWritable val = new ImmutableBytesWritable();
+ while(reader.next(key, val)) {
+ assertFalse(key.getRow().equals(STARTROW));
+ }
}
}
-
- private void createStoreFile(final HRegion r) throws IOException {
- HRegionLoader loader = new HRegionLoader(r);
- for (int i = 0; i < 3; i++) {
- addContent(loader, COLFAMILY_NAME1);
+
+ private void createStoreFile(final HRegion region) throws IOException {
+ HRegionIncommon loader = new HRegionIncommon(region);
+ for (int i = 0; i < 1; i++) {
+ addContent(loader, COLUMN_FAMILY);
}
- r.flushcache(false);
+ region.flushcache(false);
}
-}
\ No newline at end of file
+}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java?rev=574287&r1=574286&r2=574287&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestMasterAdmin.java Mon Sep 10 08:56:16 2007
@@ -45,32 +45,27 @@
admin.disableTable(testDesc.getName());
try {
- try {
- @SuppressWarnings("unused")
- HTable table = new HTable(conf, testDesc.getName());
+ @SuppressWarnings("unused")
+ HTable table = new HTable(conf, testDesc.getName());
- } catch(IllegalStateException e) {
- // Expected
- }
-
- admin.addColumn(testDesc.getName(), new HColumnDescriptor("col2:"));
- admin.enableTable(testDesc.getName());
- try {
- admin.deleteColumn(testDesc.getName(), new Text("col2:"));
-
- } catch(TableNotDisabledException e) {
- // Expected
- }
+ } catch(IllegalStateException e) {
+ // Expected
+
+ // This exception is not actually thrown. It doesn't look like it should
+ // thrown since the connection manager is already filled w/ data
+ // -- noticed by St.Ack 09/09/2007
+ }
- admin.disableTable(testDesc.getName());
+ admin.addColumn(testDesc.getName(), new HColumnDescriptor("col2:"));
+ admin.enableTable(testDesc.getName());
+ try {
admin.deleteColumn(testDesc.getName(), new Text("col2:"));
-
- } catch(Exception e) {
- e.printStackTrace();
- fail();
-
- } finally {
- admin.deleteTable(testDesc.getName());
+ } catch(TableNotDisabledException e) {
+ // Expected
}
+
+ admin.disableTable(testDesc.getName());
+ admin.deleteColumn(testDesc.getName(), new Text("col2:"));
+ admin.deleteTable(testDesc.getName());
}
}