You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/04/12 12:39:58 UTC
svn commit: r764289 [5/8] - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/filter/ src/java/org/apache/hadoop/hbase/io/
src/java/org/apache/hadoop/hbase/io/hfile/ s...
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=764289&r1=764288&r2=764289&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Store.java Sun Apr 12 10:39:55 2009
@@ -22,21 +22,20 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.NavigableSet;
import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
@@ -50,17 +49,16 @@
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.SequenceFile;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.HRegion.Counter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
@@ -89,6 +87,12 @@
*/
public class Store implements HConstants {
static final Log LOG = LogFactory.getLog(Store.class);
+ /**
+ * Comparator that looks at columns and compares their family portions.
+ * Presumes columns have already been checked for presence of delimiter.
+ * If no delimiter present, presume the buffer holds a store name so no need
+ * of a delimiter.
+ */
protected final Memcache memcache;
// This stores directory in the filesystem.
private final Path homedir;
@@ -132,10 +136,9 @@
private final boolean bloomfilter;
private final Compression.Algorithm compression;
- // Comparing HStoreKeys in byte arrays.
- final HStoreKey.StoreKeyComparator rawcomparator;
- // Comparing HStoreKey objects.
- final Comparator<HStoreKey> comparator;
+ // Comparing KeyValues
+ final KeyValue.KVComparator comparator;
+ final KeyValue.KVComparator comparatorIgnoringType;
/**
* Constructor
@@ -151,7 +154,6 @@
* failed. Can be null.
* @throws IOException
*/
- @SuppressWarnings("unchecked")
protected Store(Path basedir, HRegionInfo info, HColumnDescriptor family,
FileSystem fs, Path reconstructionLog, HBaseConfiguration conf,
final Progressable reporter)
@@ -165,23 +167,16 @@
this.bloomfilter = family.isBloomfilter();
this.blocksize = family.getBlocksize();
this.compression = family.getCompression();
- this.rawcomparator = info.isRootRegion()?
- new HStoreKey.RootStoreKeyComparator(): info.isMetaRegion()?
- new HStoreKey.MetaStoreKeyComparator():
- new HStoreKey.StoreKeyComparator();
- this.comparator = info.isRootRegion()?
- new HStoreKey.HStoreKeyRootComparator(): info.isMetaRegion()?
- new HStoreKey.HStoreKeyMetaComparator():
- new HStoreKey.HStoreKeyComparator();
+ this.comparator = info.getComparator();
+ this.comparatorIgnoringType = this.comparator.getComparatorIgnoringType();
// getTimeToLive returns ttl in seconds. Convert to milliseconds.
this.ttl = family.getTimeToLive();
if (ttl != HConstants.FOREVER) {
this.ttl *= 1000;
}
- this.memcache = new Memcache(this.ttl, this.comparator, this.rawcomparator);
+ this.memcache = new Memcache(this.ttl, this.comparator);
this.compactionDir = HRegion.getCompactionDir(basedir);
- this.storeName = Bytes.toBytes(this.regioninfo.getEncodedName() + "/" +
- Bytes.toString(this.family.getName()));
+ this.storeName = this.family.getName();
this.storeNameStr = Bytes.toString(this.storeName);
// By default, we compact if an HStore has more than
@@ -293,17 +288,10 @@
// TODO: This could grow large and blow heap out. Need to get it into
// general memory usage accounting.
long maxSeqIdInLog = -1;
- NavigableMap<HStoreKey, byte []> reconstructedCache =
- new TreeMap<HStoreKey, byte []>(this.comparator);
- SequenceFile.Reader logReader = null;
- try {
- logReader = new SequenceFile.Reader(this.fs, reconstructionLog, this.conf);
- } catch (IOException e) {
- LOG.warn("Failed opening reconstruction log though check for null-size passed. " +
- "POSSIBLE DATA LOSS!! Soldiering on", e);
- return;
- }
-
+ ConcurrentSkipListSet<KeyValue> reconstructedCache =
+ Memcache.createSet(this.comparator);
+ SequenceFile.Reader logReader = new SequenceFile.Reader(this.fs,
+ reconstructionLog, this.conf);
try {
HLogKey key = new HLogKey();
HLogEdit val = new HLogEdit();
@@ -318,16 +306,15 @@
skippedEdits++;
continue;
}
- // Check this edit is for me. Also, guard against writing
+ // Check this edit is for me. Also, guard against writing the speical
// METACOLUMN info such as HBASE::CACHEFLUSH entries
- byte [] column = val.getColumn();
- if (val.isTransactionEntry() || Bytes.equals(column, HLog.METACOLUMN)
- || !Bytes.equals(key.getRegionName(), regioninfo.getRegionName())
- || !HStoreKey.matchingFamily(family.getName(), column)) {
+ KeyValue kv = val.getKeyValue();
+ if (val.isTransactionEntry() || kv.matchingColumnNoDelimiter(HLog.METACOLUMN) ||
+ !Bytes.equals(key.getRegionName(), regioninfo.getRegionName()) ||
+ !kv.matchingFamily(family.getName())) {
continue;
}
- HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp());
- reconstructedCache.put(k, val.getVal());
+ reconstructedCache.add(kv);
editsCount++;
// Every 2k edits, tell the reporter we're making progress.
// Have seen 60k edits taking 3minutes to complete.
@@ -393,14 +380,13 @@
/**
* Adds a value to the memcache
*
- * @param key
- * @param value
+ * @param kv
* @return memcache size delta
*/
- protected long add(HStoreKey key, byte[] value) {
+ protected long add(final KeyValue kv) {
lock.readLock().lock();
try {
- return this.memcache.add(key, value);
+ return this.memcache.add(kv);
} finally {
lock.readLock().unlock();
}
@@ -456,7 +442,7 @@
boolean flushCache(final long logCacheFlushId) throws IOException {
// Get the snapshot to flush. Presumes that a call to
// this.memcache.snapshot() has happened earlier up in the chain.
- SortedMap<HStoreKey, byte []> cache = this.memcache.getSnapshot();
+ ConcurrentSkipListSet<KeyValue> cache = this.memcache.getSnapshot();
// If an exception happens flushing, we let it out without clearing
// the memcache snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
@@ -476,7 +462,7 @@
* @return StoreFile created.
* @throws IOException
*/
- private StoreFile internalFlushCache(final SortedMap<HStoreKey, byte []> cache,
+ private StoreFile internalFlushCache(final ConcurrentSkipListSet<KeyValue> cache,
final long logCacheFlushId)
throws IOException {
HFile.Writer writer = null;
@@ -494,13 +480,11 @@
writer = getWriter();
int entries = 0;
try {
- for (Map.Entry<HStoreKey, byte []> es: cache.entrySet()) {
- HStoreKey curkey = es.getKey();
- byte[] bytes = es.getValue();
- if (!isExpired(curkey, ttl, now)) {
- writer.append(curkey.getBytes(), bytes);
+ for (KeyValue kv: cache) {
+ if (!isExpired(kv, ttl, now)) {
+ writer.append(kv);
entries++;
- flushed += this.memcache.heapSize(curkey, bytes, null);
+ flushed += this.memcache.heapSize(kv, true);
}
}
// B. Write out the log sequence number that corresponds to this output
@@ -528,7 +512,7 @@
*/
HFile.Writer getWriter() throws IOException {
return StoreFile.getWriter(this.fs, this.homedir, this.blocksize,
- this.compression, this.rawcomparator, this.bloomfilter);
+ this.compression, this.comparator.getRawComparator(), this.bloomfilter);
}
/*
@@ -540,7 +524,7 @@
* @return Count of store files.
*/
private int updateStorefiles(final long logCacheFlushId,
- final StoreFile sf, final SortedMap<HStoreKey, byte []> cache)
+ final StoreFile sf, final NavigableSet<KeyValue> cache)
throws IOException {
int count = 0;
this.lock.writeLock().lock();
@@ -812,7 +796,7 @@
* @param done Which readers are done
* @return The lowest current key in passed <code>rdrs</code>
*/
- private int getLowestKey(final HFileScanner [] rdrs, final ByteBuffer [] keys,
+ private int getLowestKey(final HFileScanner [] rdrs, final KeyValue [] keys,
final boolean [] done) {
int lowestKey = -1;
for (int i = 0; i < rdrs.length; i++) {
@@ -822,10 +806,7 @@
if (lowestKey < 0) {
lowestKey = i;
} else {
- RawComparator<byte []> c = rdrs[i].getReader().getComparator();
- if (c.compare(keys[i].array(), keys[i].arrayOffset(), keys[i].limit(),
- keys[lowestKey].array(), keys[lowestKey].arrayOffset(),
- keys[lowestKey].limit()) < 0) {
+ if (this.comparator.compare(keys[i], keys[lowestKey]) < 0) {
lowestKey = i;
}
}
@@ -850,9 +831,8 @@
throws IOException {
// Reverse order so newest store file is first.
StoreFile[] files = reverse(pReaders);
- HFileScanner[] rdrs = new HFileScanner[files.length];
- ByteBuffer[] keys = new ByteBuffer[rdrs.length];
- ByteBuffer[] vals = new ByteBuffer[rdrs.length];
+ HFileScanner [] rdrs = new HFileScanner[files.length];
+ KeyValue [] kvs = new KeyValue[rdrs.length];
boolean[] done = new boolean[rdrs.length];
// Now, advance through the readers in order. This will have the
// effect of a run-time sort of the entire dataset.
@@ -863,29 +843,26 @@
if (done[i]) {
numDone++;
} else {
- keys[i] = rdrs[i].getKey();
- vals[i] = rdrs[i].getValue();
+ kvs[i] = rdrs[i].getKeyValue();
}
}
long now = System.currentTimeMillis();
int timesSeen = 0;
- HStoreKey lastSeen = new HStoreKey();
- HStoreKey lastDelete = null;
+ KeyValue lastSeen = KeyValue.LOWESTKEY;
+ KeyValue lastDelete = null;
+ int maxVersions = family.getMaxVersions();
while (numDone < done.length) {
// Get lowest key in all store files.
- int lowestKey = getLowestKey(rdrs, keys, done);
- // TODO: Suboptimal. And below where we are going from ByteBuffer to
- // byte array. FIX!! Can we get rid of HSK instantiations?
- HStoreKey hsk = HStoreKey.create(keys[lowestKey]);
+ int lowestKey = getLowestKey(rdrs, kvs, done);
+ KeyValue kv = kvs[lowestKey];
// If its same row and column as last key, increment times seen.
- if (HStoreKey.equalsTwoRowKeys(lastSeen.getRow(), hsk.getRow())
- && Bytes.equals(lastSeen.getColumn(), hsk.getColumn())) {
+ if (this.comparator.matchingRowColumn(lastSeen, kv)) {
timesSeen++;
// Reset last delete if not exact timestamp -- lastDelete only stops
// exactly the same key making it out to the compacted store file.
- if (lastDelete != null
- && lastDelete.getTimestamp() != hsk.getTimestamp()) {
+ if (lastDelete != null &&
+ lastDelete.getTimestamp() != kv.getTimestamp()) {
lastDelete = null;
}
} else {
@@ -894,26 +871,25 @@
}
// Don't write empty rows or columns. Only remove cells on major
- // compaction. Remove if expired of > VERSIONS
- if (hsk.getRow().length != 0 && hsk.getColumn().length != 0) {
- ByteBuffer value = vals[lowestKey];
+ // compaction. Remove if expired or > VERSIONS
+ if (kv.nonNullRowAndColumn()) {
if (!majorCompaction) {
// Write out all values if not a major compaction.
- compactedOut.append(hsk.getBytes(), Bytes.toBytes(value));
+ compactedOut.append(kv);
} else {
boolean expired = false;
boolean deleted = false;
- if (timesSeen <= family.getMaxVersions()
- && !(expired = isExpired(hsk, ttl, now))) {
+ if (timesSeen <= maxVersions && !(expired = isExpired(kv, ttl, now))) {
// If this value key is same as a deleted key, skip
- if (lastDelete != null && hsk.equals(lastDelete)) {
+ if (lastDelete != null &&
+ this.comparator.compare(kv, lastDelete) == 0) {
deleted = true;
- } else if (HLogEdit.isDeleted(value)) {
+ } else if (kv.isDeleteType()) {
// If a deleted value, skip
deleted = true;
- lastDelete = hsk;
+ lastDelete = kv;
} else {
- compactedOut.append(hsk.getBytes(), Bytes.toBytes(value));
+ compactedOut.append(kv);
}
}
if (expired || deleted) {
@@ -925,7 +901,7 @@
}
// Update last-seen items
- lastSeen = hsk;
+ lastSeen = kv;
// Advance the smallest key. If that reader's all finished, then
// mark it as done.
@@ -934,8 +910,7 @@
rdrs[lowestKey] = null;
numDone++;
} else {
- keys[lowestKey] = rdrs[lowestKey].getKey();
- vals[lowestKey] = rdrs[lowestKey].getValue();
+ kvs[lowestKey] = rdrs[lowestKey].getKeyValue();
}
}
}
@@ -1024,101 +999,138 @@
* row and timestamp, but not a column name.
*
* The returned object should map column names to Cells.
+ * @param origin Where to start searching. Specifies a row and timestamp.
+ * Columns are specified in following arguments.
+ * @param columns Can be null which means get all
+ * @param columnPattern Can be null.
+ * @param numVersions
+ * @param versionsCounter Can be null.
+ * @param keyvalues
+ * @throws IOException
*/
- void getFull(HStoreKey key, final Set<byte []> columns,
- final int numVersions, Map<byte [], Cell> results)
+ public void getFull(KeyValue key, final NavigableSet<byte []> columns,
+ final Pattern columnPattern,
+ final int numVersions, Map<KeyValue, HRegion.Counter> versionsCounter,
+ List<KeyValue> keyvalues, final long now)
throws IOException {
- int versions = versionsToReturn(numVersions);
- // This is map of columns to timestamp
- Map<byte [], Long> deletes =
- new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
// if the key is null, we're not even looking for anything. return.
if (key == null) {
return;
}
-
+ int versions = versionsToReturn(numVersions);
+ NavigableSet<KeyValue> deletes =
+ new TreeSet<KeyValue>(this.comparatorIgnoringType);
+ // Create a Map that has results by column so we can keep count of versions.
+ // It duplicates columns but doing check of columns, we don't want to make
+ // column set each time.
this.lock.readLock().lock();
- // get from the memcache first.
- this.memcache.getFull(key, columns, versions, deletes, results);
try {
+ // get from the memcache first.
+ if (this.memcache.getFull(key, columns, columnPattern, versions,
+ versionsCounter, deletes, keyvalues, now)) {
+ // May have gotten enough results, enough to return.
+ return;
+ }
Map<Long, StoreFile> m = this.storefiles.descendingMap();
for (Iterator<Map.Entry<Long, StoreFile>> i = m.entrySet().iterator();
i.hasNext();) {
- getFullFromStoreFile(i.next().getValue(), key, columns, versions, deletes, results);
+ if (getFullFromStoreFile(i.next().getValue(), key, columns,
+ columnPattern, versions, versionsCounter, deletes, keyvalues)) {
+ return;
+ }
}
} finally {
this.lock.readLock().unlock();
}
}
- private void getFullFromStoreFile(StoreFile f, HStoreKey key,
- Set<byte []> columns, int numVersions, Map<byte [], Long> deletes,
- Map<byte [], Cell> results)
+ /*
+ * @param f
+ * @param key Where to start searching. Specifies a row and timestamp.
+ * Columns are specified in following arguments.
+ * @param columns
+ * @param versions
+ * @param versionCounter
+ * @param deletes
+ * @param keyvalues
+ * @return True if we found enough results to satisfy the <code>versions</code>
+ * and <code>columns</code> passed.
+ * @throws IOException
+ */
+ private boolean getFullFromStoreFile(StoreFile f, KeyValue target,
+ Set<byte []> columns, final Pattern columnPattern, int versions,
+ Map<KeyValue, HRegion.Counter> versionCounter,
+ NavigableSet<KeyValue> deletes,
+ List<KeyValue> keyvalues)
throws IOException {
long now = System.currentTimeMillis();
HFileScanner scanner = f.getReader().getScanner();
- if (!getClosest(scanner, key.getBytes())) {
- return;
+ if (!getClosest(scanner, target)) {
+ return false;
}
+ boolean hasEnough = false;
do {
- HStoreKey readkey = HStoreKey.create(scanner.getKey());
- byte[] readcol = readkey.getColumn();
-
- // if we're looking for this column (or all of them), and there isn't
- // already a value for this column in the results map or there is a value
- // but we haven't collected enough versions yet, and the key we
- // just read matches, then we'll consider it
- if ((columns == null || columns.contains(readcol)) &&
- (!results.containsKey(readcol) ||
- results.get(readcol).getNumValues() < numVersions) &&
- key.matchesWithoutColumn(readkey)) {
- // if the value of the cell we're looking at right now is a delete,
- // we need to treat it differently
- ByteBuffer value = scanner.getValue();
- if (HLogEdit.isDeleted(value)) {
- // if it's not already recorded as a delete or recorded with a more
- // recent delete timestamp, record it for later
- if (!deletes.containsKey(readcol)
- || deletes.get(readcol).longValue() < readkey.getTimestamp()) {
- deletes.put(readcol, Long.valueOf(readkey.getTimestamp()));
- }
- } else if (!(deletes.containsKey(readcol) && deletes.get(readcol)
- .longValue() >= readkey.getTimestamp())) {
- // So the cell itself isn't a delete, but there may be a delete
- // pending from earlier in our search. Only record this result if
- // there aren't any pending deletes.
- if (!(deletes.containsKey(readcol) && deletes.get(readcol)
- .longValue() >= readkey.getTimestamp())) {
- if (!isExpired(readkey, ttl, now)) {
- if (!results.containsKey(readcol)) {
- results.put(readcol, new Cell(value, readkey.getTimestamp()));
- } else {
- results.get(readcol).add(Bytes.toBytes(value),
- readkey.getTimestamp());
- }
- }
- }
- }
- } else if (this.rawcomparator.compareRows(key.getRow(), 0,
- key.getRow().length,
- readkey.getRow(), 0, readkey.getRow().length) < 0) {
- // if we've crossed into the next row, then we can just stop
- // iterating
+ KeyValue kv = scanner.getKeyValue();
+ // Make sure we have not passed out the row. If target key has a
+ // column on it, then we are looking explicit key+column combination. If
+ // we've passed it out, also break.
+ if (target.isEmptyColumn()? !this.comparator.matchingRows(target, kv):
+ !this.comparator.matchingRowColumn(target, kv)) {
+ break;
+ }
+ if (!Store.getFullCheck(this.comparator, target, kv, columns, columnPattern)) {
+ continue;
+ }
+ if (Store.doKeyValue(kv, versions, versionCounter, columns, deletes, now,
+ this.ttl, keyvalues, null)) {
+ hasEnough = true;
break;
}
} while (scanner.next());
+ return hasEnough;
+ }
+
+ /**
+ * Code shared by {@link Memcache#getFull(KeyValue, NavigableSet, Pattern, int, Map, NavigableSet, List, long)}
+ * and {@link #getFullFromStoreFile(StoreFile, KeyValue, Set, Pattern, int, Map, NavigableSet, List)}
+ * @param c
+ * @param target
+ * @param candidate
+ * @param columns
+ * @param columnPattern
+ * @return True if <code>candidate</code> matches column and timestamp.
+ */
+ static boolean getFullCheck(final KeyValue.KVComparator c,
+ final KeyValue target, final KeyValue candidate,
+ final Set<byte []> columns, final Pattern columnPattern) {
+ // Does column match?
+ if (!Store.matchingColumns(candidate, columns)) {
+ return false;
+ }
+ // if the column pattern is not null, we use it for column matching.
+ // we will skip the keys whose column doesn't match the pattern.
+ if (columnPattern != null) {
+ if (!(columnPattern.matcher(candidate.getColumnString()).matches())) {
+ return false;
+ }
+ }
+ if (c.compareTimestamps(target, candidate) > 0) {
+ return false;
+ }
+ return true;
}
/*
* @param wantedVersions How many versions were asked for.
- * @return wantedVersions or this families' MAX_VERSIONS.
+ * @return wantedVersions or this families' VERSIONS.
*/
private int versionsToReturn(final int wantedVersions) {
if (wantedVersions <= 0) {
throw new IllegalArgumentException("Number of versions must be > 0");
}
// Make sure we do not return more than maximum versions for this store.
- return wantedVersions > this.family.getMaxVersions()?
+ return wantedVersions > this.family.getMaxVersions() &&
+ wantedVersions != HConstants.ALL_VERSIONS?
this.family.getMaxVersions(): wantedVersions;
}
@@ -1132,7 +1144,8 @@
* @return values for the specified versions
* @throws IOException
*/
- Cell[] get(final HStoreKey key, final int numVersions) throws IOException {
+ List<KeyValue> get(final KeyValue key, final int numVersions)
+ throws IOException {
// This code below is very close to the body of the getKeys method. Any
// changes in the flow below should also probably be done in getKeys.
// TODO: Refactor so same code used.
@@ -1146,78 +1159,48 @@
// than one version. This List of deletes should not be large since we
// are only keeping rows and columns that match those set on the get and
// which have delete values. If memory usage becomes an issue, could
- // redo as bloom filter.
- Set<HStoreKey> deletes = new HashSet<HStoreKey>();
+ // redo as bloom filter. Use sorted set because test for membership should
+ // be faster than calculating a hash. Use a comparator that ignores ts.
+ NavigableSet<KeyValue> deletes =
+ new TreeSet<KeyValue>(this.comparatorIgnoringType);
+ List<KeyValue> keyvalues = new ArrayList<KeyValue>();
this.lock.readLock().lock();
try {
// Check the memcache
- List<Cell> results = this.memcache.get(key, versions, deletes, now);
- // If we got sufficient versions from memcache, return.
- if (results.size() == versions) {
- return results.toArray(new Cell[results.size()]);
+ if (this.memcache.get(key, versions, keyvalues, deletes, now)) {
+ return keyvalues;
}
Map<Long, StoreFile> m = this.storefiles.descendingMap();
- byte [] keyBytes = key.getBytes();
- for (Iterator<Map.Entry<Long, StoreFile>> i = m.entrySet().iterator();
- i.hasNext() && !hasEnoughVersions(versions, results);) {
- StoreFile f = i.next().getValue();
+ boolean hasEnough = false;
+ for (Map.Entry<Long, StoreFile> e: m.entrySet()) {
+ StoreFile f = e.getValue();
HFileScanner scanner = f.getReader().getScanner();
- if (!getClosest(scanner, keyBytes)) {
- continue;
- }
- HStoreKey readkey = HStoreKey.create(scanner.getKey());
- if (!readkey.matchesRowCol(key)) {
+ if (!getClosest(scanner, key)) {
+ // Move to next file.
continue;
}
- if (get(readkey, scanner.getValue(), versions, results, deletes, now)) {
- break;
- }
- while (scanner.next()) {
- readkey = HStoreKey.create(scanner.getKey());
- if (!readkey.matchesRowCol(key)) {
- break;
- }
- if (get(readkey, scanner.getValue(), versions, results, deletes, now)) {
+ do {
+ KeyValue kv = scanner.getKeyValue();
+ // Make sure below matches what happens up in Memcache#get.
+ if (this.comparator.matchingRowColumn(kv, key)) {
+ if (doKeyValue(kv, versions, deletes, now, this.ttl, keyvalues, null)) {
+ hasEnough = true;
+ break;
+ }
+ } else {
+ // Row and column don't match. Must have gone past. Move to next file.
break;
}
+ } while (scanner.next());
+ if (hasEnough) {
+ break; // Break out of files loop.
}
}
- return results.size() == 0 ?
- null : results.toArray(new Cell[results.size()]);
+ return keyvalues.isEmpty()? null: keyvalues;
} finally {
this.lock.readLock().unlock();
}
}
-
- /*
- * Look at one key/value.
- * @param key
- * @param value
- * @param versions
- * @param results
- * @param deletes
- * @param now
- * @return True if we have enough versions.
- */
- private boolean get(final HStoreKey key, ByteBuffer value,
- final int versions, final List<Cell> results,
- final Set<HStoreKey> deletes, final long now) {
- if (!HLogEdit.isDeleted(value)) {
- if (notExpiredAndNotInDeletes(this.ttl, key, now, deletes)) {
- results.add(new Cell(value, key.getTimestamp()));
- }
- // Perhaps only one version is wanted. I could let this
- // test happen later in the for loop test but it would cost
- // the allocation of an ImmutableBytesWritable.
- if (hasEnoughVersions(versions, results)) {
- return true;
- }
- } else {
- // Is this copy necessary?
- deletes.add(new HStoreKey(key));
- }
- return false;
- }
/*
* Small method to check if we are over the max number of versions
@@ -1227,87 +1210,109 @@
* @param c
* @return
*/
- private boolean hasEnoughVersions(final int versions, final List<Cell> c) {
- return c.size() >= versions;
+ static boolean hasEnoughVersions(final int versions, final List<KeyValue> c) {
+ return versions > 0 && !c.isEmpty() && c.size() >= versions;
}
- /**
- * Get <code>versions</code> of keys matching the origin key's
- * row/column/timestamp and those of an older vintage.
- * @param origin Where to start searching.
- * @param versions How many versions to return. Pass
- * {@link HConstants#ALL_VERSIONS} to retrieve all.
+ /*
+ * Used when doing getFulls.
+ * @param kv
+ * @param versions
+ * @param versionCounter
+ * @param columns
+ * @param deletes
* @param now
- * @param columnPattern regex pattern for column matching. if columnPattern
- * is not null, we use column pattern to match columns. And the columnPattern
- * only works when origin's column is null or its length is zero.
- * @return Matching keys.
- * @throws IOException
- */
- public List<HStoreKey> getKeys(final HStoreKey origin, final int versions,
- final long now, final Pattern columnPattern)
- throws IOException {
- // This code below is very close to the body of the get method. Any
- // changes in the flow below should also probably be done in get.
- // TODO: Refactor so same code used.
- Set<HStoreKey> deletes = new HashSet<HStoreKey>();
- this.lock.readLock().lock();
- try {
- // Check the memcache
- List<HStoreKey> keys =
- this.memcache.getKeys(origin, versions, deletes, now, columnPattern);
- // If we got sufficient versions from memcache, return.
- if (keys.size() >= versions) {
- return keys;
- }
- Map<Long, StoreFile> m = this.storefiles.descendingMap();
- for (Iterator<Map.Entry<Long, StoreFile>> i = m.entrySet().iterator();
- i.hasNext() && keys.size() < versions;) {
- StoreFile f = i.next().getValue();
- HFileScanner scanner = f.getReader().getScanner();
- if (!getClosest(scanner, origin.getBytes())) {
- continue;
+ * @param ttl
+ * @param keyvalues
+ * @param set
+ * @return True if enough versions.
+ */
+ static boolean doKeyValue(final KeyValue kv,
+ final int versions,
+ final Map<KeyValue, Counter> versionCounter,
+ final Set<byte []> columns,
+ final NavigableSet<KeyValue> deletes,
+ final long now,
+ final long ttl,
+ final List<KeyValue> keyvalues,
+ final SortedSet<KeyValue> set) {
+ boolean hasEnough = false;
+ if (kv.isDeleteType()) {
+ if (!deletes.contains(kv)) {
+ deletes.add(kv);
+ }
+ } else if (!deletes.contains(kv)) {
+ // Skip expired cells
+ if (!isExpired(kv, ttl, now)) {
+ if (HRegion.okToAddResult(kv, versions, versionCounter)) {
+ HRegion.addResult(kv, versionCounter, keyvalues);
+ if (HRegion.hasEnoughVersions(versions, versionCounter, columns)) {
+ hasEnough = true;
+ }
}
- do {
- HStoreKey readkey = HStoreKey.create(scanner.getKey());
- // if the row and column matches, we might want this one.
- if (rowMatches(origin, readkey)) {
- // if the column pattern is not null, we use it for column matching.
- // we will skip the keys whose column doesn't match the pattern.
- if (columnPattern != null) {
- if (!(columnPattern.
- matcher(Bytes.toString(readkey.getColumn())).matches())) {
- continue;
- }
- }
- // if the cell address matches, then we definitely want this key.
- if (cellMatches(origin, readkey)) {
- ByteBuffer readval = scanner.getValue();
- // Store key if isn't deleted or superceded by memcache
- if (!HLogEdit.isDeleted(readval)) {
- if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
- keys.add(readkey);
- }
- if (keys.size() >= versions) {
- break;
- }
- } else {
- deletes.add(readkey);
- }
- } else {
- // the cell doesn't match, but there might be more with different
- // timestamps, so move to the next key
- continue;
- }
- } else {
- // the row doesn't match, so we've gone too far.
- break;
+ } else {
+ // Remove the expired.
+ Store.expiredOrDeleted(set, kv);
+ }
+ }
+ return hasEnough;
+ }
+
+ /*
+ * Used when doing get.
+ * @param kv
+ * @param versions
+ * @param deletes
+ * @param now
+ * @param ttl
+ * @param keyvalues
+ * @param set
+ * @return True if enough versions.
+ */
+ static boolean doKeyValue(final KeyValue kv, final int versions,
+ final NavigableSet<KeyValue> deletes,
+ final long now, final long ttl,
+ final List<KeyValue> keyvalues, final SortedSet<KeyValue> set) {
+ boolean hasEnough = false;
+ if (!kv.isDeleteType()) {
+ // Filter out expired results
+ if (notExpiredAndNotInDeletes(ttl, kv, now, deletes)) {
+ if (!keyvalues.contains(kv)) {
+ keyvalues.add(kv);
+ if (hasEnoughVersions(versions, keyvalues)) {
+ hasEnough = true;
}
- } while (scanner.next()); // advance to the next key
+ }
+ } else {
+ if (set != null) {
+ expiredOrDeleted(set, kv);
+ }
}
- return keys;
- } finally {
- this.lock.readLock().unlock();
+ } else {
+ // Cell holds a delete value.
+ deletes.add(kv);
+ }
+ return hasEnough;
+ }
+
+ /*
+ * Test that the <i>target</i> matches the <i>origin</i>. If the <i>origin</i>
+ * has an empty column, then it just tests row equivalence. Otherwise, it uses
+ * HStoreKey.matchesRowCol().
+ * @param c Comparator to use.
+ * @param origin Key we're testing against
+ * @param target Key we're testing
+ */
+ static boolean matchingRowColumn(final KeyValue.KVComparator c,
+ final KeyValue origin, final KeyValue target) {
+ return origin.isEmptyColumn()? c.matchingRows(target, origin):
+ c.matchingRowColumn(target, origin);
+ }
+
+ static void expiredOrDeleted(final Set<KeyValue> set, final KeyValue kv) {
+ boolean b = set.remove(kv);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(kv.toString() + " expired: " + b);
}
}
@@ -1316,19 +1321,21 @@
* preceeds it. WARNING: Only use this method on a table where writes occur
* with stricly increasing timestamps. This method assumes this pattern of
* writes in order to make it reasonably performant.
- * @param row
- * @return Found row
+ * @param targetkey
+ * @return Found keyvalue
* @throws IOException
*/
- byte [] getRowKeyAtOrBefore(final byte [] row)
+ KeyValue getRowKeyAtOrBefore(final KeyValue targetkey)
throws IOException{
- // Map of HStoreKeys that are candidates for holding the row key that
+ // Map of keys that are candidates for holding the row key that
// most closely matches what we're looking for. We'll have to update it as
// deletes are found all over the place as we go along before finally
- // reading the best key out of it at the end.
- NavigableMap<HStoreKey, Long> candidateKeys =
- new TreeMap<HStoreKey, Long>(this.comparator);
-
+ // reading the best key out of it at the end. Use a comparator that
+ // ignores key types. Otherwise, we can't remove deleted items doing
+ // set.remove because of the differing type between insert and delete.
+ NavigableSet<KeyValue> candidates =
+ new TreeSet<KeyValue>(this.comparator.getComparatorIgnoringType());
+
// Keep a list of deleted cell keys. We need this because as we go through
// the store files, the cell with the delete marker may be in one file and
// the old non-delete cell value in a later store file. If we don't keep
@@ -1337,22 +1344,22 @@
// This List of deletes should not be large since we are only keeping rows
// and columns that match those set on the scanner and which have delete
// values. If memory usage becomes an issue, could redo as bloom filter.
- Set<HStoreKey> deletes = new HashSet<HStoreKey>();
+ NavigableSet<KeyValue> deletes =
+ new TreeSet<KeyValue>(this.comparatorIgnoringType);
+ long now = System.currentTimeMillis();
this.lock.readLock().lock();
try {
// First go to the memcache. Pick up deletes and candidates.
- this.memcache.getRowKeyAtOrBefore(row, candidateKeys, deletes);
+ this.memcache.getRowKeyAtOrBefore(targetkey, candidates, deletes, now);
// Process each store file. Run through from newest to oldest.
- // This code below is very close to the body of the getKeys method.
Map<Long, StoreFile> m = this.storefiles.descendingMap();
for (Map.Entry<Long, StoreFile> e: m.entrySet()) {
// Update the candidate keys from the current map file
- rowAtOrBeforeFromStoreFile(e.getValue(), row, candidateKeys, deletes);
+ rowAtOrBeforeFromStoreFile(e.getValue(), targetkey, candidates,
+ deletes, now);
}
// Return the best key from candidateKeys
- byte [] result =
- candidateKeys.isEmpty()? null: candidateKeys.lastKey().getRow();
- return result;
+ return candidates.isEmpty()? null: candidates.last();
} finally {
this.lock.readLock().unlock();
}
@@ -1362,123 +1369,92 @@
* Check an individual MapFile for the row at or before a given key
* and timestamp
* @param f
- * @param row
- * @param candidateKeys
+ * @param targetkey
+ * @param candidates Pass a Set with a Comparator that
+ * ignores key Type so we can do Set.remove using a delete, i.e. a KeyValue
+ * with a different Type to the candidate key.
* @throws IOException
*/
private void rowAtOrBeforeFromStoreFile(final StoreFile f,
- final byte [] row, final SortedMap<HStoreKey, Long> candidateKeys,
- final Set<HStoreKey> deletes)
+ final KeyValue targetkey, final NavigableSet<KeyValue> candidates,
+ final NavigableSet<KeyValue> deletes, final long now)
throws IOException {
- HFileScanner scanner = f.getReader().getScanner();
- // TODO: FIX THIS PROFLIGACY!!!
- if (!scanner.seekBefore(new HStoreKey(row).getBytes())) {
- return;
- }
- long now = System.currentTimeMillis();
- HStoreKey startKey = HStoreKey.create(scanner.getKey());
// if there aren't any candidate keys yet, we'll do some things different
- if (candidateKeys.isEmpty()) {
- rowAtOrBeforeCandidate(startKey, f, row, candidateKeys, deletes, now);
+ if (candidates.isEmpty()) {
+ rowAtOrBeforeCandidate(f, targetkey, candidates, deletes, now);
} else {
- rowAtOrBeforeWithCandidates(startKey, f, row, candidateKeys, deletes, now);
+ rowAtOrBeforeWithCandidates(f, targetkey, candidates, deletes, now);
}
}
-
- /* Find a candidate for row that is at or before passed row in passed
- * mapfile.
- * @param startKey First key in the mapfile.
- * @param map
- * @param row
- * @param candidateKeys
- * @param now
- * @throws IOException
- */
- private void rowAtOrBeforeCandidate(final HStoreKey startKey,
- final StoreFile f, final byte[] row,
- final SortedMap<HStoreKey, Long> candidateKeys,
- final Set<HStoreKey> deletes, final long now)
- throws IOException {
- // if the row we're looking for is past the end of this mapfile, set the
- // search key to be the last key. If its a deleted key, then we'll back
- // up to the row before and return that.
- HStoreKey finalKey = HStoreKey.create(f.getReader().getLastKey());
- HStoreKey searchKey = null;
- if (this.rawcomparator.compareRows(finalKey.getRow(), 0,
- finalKey.getRow().length,
- row, 0, row.length) < 0) {
- searchKey = finalKey;
- } else {
- searchKey = new HStoreKey(row);
- if (this.comparator.compare(searchKey, startKey) < 0) {
- searchKey = startKey;
- }
- }
- rowAtOrBeforeCandidate(f, searchKey, candidateKeys, deletes, now);
- }
/*
* @param ttlSetting
* @param hsk
* @param now
- * @param deletes
+ * @param deletes A Set whose Comparator ignores Type.
* @return True if key has not expired and is not in passed set of deletes.
*/
static boolean notExpiredAndNotInDeletes(final long ttl,
- final HStoreKey hsk, final long now, final Set<HStoreKey> deletes) {
- return !isExpired(hsk, ttl, now) &&
- (deletes == null || !deletes.contains(hsk));
+ final KeyValue key, final long now, final Set<KeyValue> deletes) {
+ return !isExpired(key, ttl, now) && (deletes == null || deletes.isEmpty() ||
+ !deletes.contains(key));
}
-
- static boolean isExpired(final HStoreKey hsk, final long ttl,
+
+ static boolean isExpired(final KeyValue key, final long ttl,
final long now) {
- return ttl != HConstants.FOREVER && now > hsk.getTimestamp() + ttl;
+ return ttl != HConstants.FOREVER && now > key.getTimestamp() + ttl;
}
- /* Find a candidate for row that is at or before passed key, sk, in mapfile.
+ /* Find a candidate for row that is at or before passed key, searchkey, in hfile.
* @param f
- * @param sk Key to go search the mapfile with.
- * @param candidateKeys
+ * @param targetkey Key to go search the hfile with.
+ * @param candidates
* @param now
* @throws IOException
* @see {@link #rowAtOrBeforeCandidate(HStoreKey, org.apache.hadoop.io.MapFile.Reader, byte[], SortedMap, long)}
*/
private void rowAtOrBeforeCandidate(final StoreFile f,
- final HStoreKey sk, final SortedMap<HStoreKey, Long> candidateKeys,
- final Set<HStoreKey> deletes, final long now)
+ final KeyValue targetkey, final NavigableSet<KeyValue> candidates,
+ final NavigableSet<KeyValue> deletes, final long now)
throws IOException {
- HStoreKey searchKey = sk;
- HStoreKey readkey = null;
- HStoreKey knownNoGoodKey = null;
+ KeyValue search = targetkey;
+ // If the row we're looking for is past the end of this mapfile, set the
+ // search key to be the last key. If its a deleted key, then we'll back
+ // up to the row before and return that.
+ // TODO: Cache last key as KV over in the file.
+ byte [] lastkey = f.getReader().getLastKey();
+ KeyValue lastKeyValue =
+ KeyValue.createKeyValueFromKey(lastkey, 0, lastkey.length);
+ if (this.comparator.compareRows(lastKeyValue, targetkey) < 0) {
+ search = lastKeyValue;
+ }
+ KeyValue knownNoGoodKey = null;
HFileScanner scanner = f.getReader().getScanner();
for (boolean foundCandidate = false; !foundCandidate;) {
// Seek to the exact row, or the one that would be immediately before it
- int result = scanner.seekTo(searchKey.getBytes());
+ int result = scanner.seekTo(search.getBuffer(), search.getKeyOffset(),
+ search.getKeyLength());
if (result < 0) {
// Not in file.
continue;
}
- HStoreKey deletedOrExpiredRow = null;
+ KeyValue deletedOrExpiredRow = null;
+ KeyValue kv = null;
do {
- readkey = HStoreKey.create(scanner.getKey());
- ByteBuffer value = scanner.getValue();
- // If we have an exact match on row, and it's not a delete, save this
- // as a candidate key
- if (HStoreKey.equalsTwoRowKeys(readkey.getRow(), searchKey.getRow())) {
- if (!HLogEdit.isDeleted(value)) {
- if (handleNonDelete(readkey, now, deletes, candidateKeys)) {
+ kv = scanner.getKeyValue();
+ if (this.comparator.compareRows(kv, search) <= 0) {
+ if (!kv.isDeleteType()) {
+ if (handleNonDelete(kv, now, deletes, candidates)) {
foundCandidate = true;
// NOTE! Continue.
continue;
}
}
- HStoreKey copy = addCopyToDeletes(readkey, deletes);
+ deletes.add(kv);
if (deletedOrExpiredRow == null) {
- deletedOrExpiredRow = copy;
+ deletedOrExpiredRow = kv;
}
- } else if (this.rawcomparator.compareRows(readkey.getRow(), 0,
- readkey.getRow().length,
- searchKey.getRow(), 0, searchKey.getRow().length) > 0) {
+ } else if (this.comparator.compareRows(kv, search) > 0) {
// if the row key we just read is beyond the key we're searching for,
// then we're done.
break;
@@ -1486,30 +1462,32 @@
// So, the row key doesn't match, but we haven't gone past the row
// we're seeking yet, so this row is a candidate for closest
// (assuming that it isn't a delete).
- if (!HLogEdit.isDeleted(value)) {
- if (handleNonDelete(readkey, now, deletes, candidateKeys)) {
+ if (!kv.isDeleteType()) {
+ if (handleNonDelete(kv, now, deletes, candidates)) {
foundCandidate = true;
// NOTE: Continue
continue;
}
}
- HStoreKey copy = addCopyToDeletes(readkey, deletes);
+ deletes.add(kv);
if (deletedOrExpiredRow == null) {
- deletedOrExpiredRow = copy;
+ deletedOrExpiredRow = kv;
}
}
} while(scanner.next() && (knownNoGoodKey == null ||
- this.comparator.compare(readkey, knownNoGoodKey) < 0));
+ this.comparator.compare(kv, knownNoGoodKey) < 0));
// If we get here and have no candidates but we did find a deleted or
// expired candidate, we need to look at the key before that
if (!foundCandidate && deletedOrExpiredRow != null) {
knownNoGoodKey = deletedOrExpiredRow;
- if (!scanner.seekBefore(deletedOrExpiredRow.getBytes())) {
- // Is this right?
+ if (!scanner.seekBefore(deletedOrExpiredRow.getBuffer(),
+ deletedOrExpiredRow.getKeyOffset(),
+ deletedOrExpiredRow.getKeyLength())) {
+ // Not in file -- what can I do now but break?
break;
}
- searchKey = HStoreKey.create(scanner.getKey());
+ search = scanner.getKeyValue();
} else {
// No candidates and no deleted or expired candidates. Give up.
break;
@@ -1520,55 +1498,40 @@
// without going "past" the key we're searching for. we can just fall
// through here.
}
-
- /*
- * @param key Key to copy and add to <code>deletes</code>
- * @param deletes
- * @return Instance of the copy added to <code>deletes</code>
- */
- private HStoreKey addCopyToDeletes(final HStoreKey key,
- final Set<HStoreKey> deletes) {
- HStoreKey copy = new HStoreKey(key);
- deletes.add(copy);
- return copy;
- }
- private void rowAtOrBeforeWithCandidates(final HStoreKey startKey,
- final StoreFile f, final byte[] row,
- final SortedMap<HStoreKey, Long> candidateKeys,
- final Set<HStoreKey> deletes, final long now)
+ private void rowAtOrBeforeWithCandidates(final StoreFile f,
+ final KeyValue targetkey,
+ final NavigableSet<KeyValue> candidates,
+ final NavigableSet<KeyValue> deletes, final long now)
throws IOException {
// if there are already candidate keys, we need to start our search
// at the earliest possible key so that we can discover any possible
// deletes for keys between the start and the search key. Back up to start
// of the row in case there are deletes for this candidate in this mapfile
// BUT do not backup before the first key in the store file.
- // TODO: FIX THIS PROFLIGATE OBJECT MAKING!!!
- HStoreKey firstCandidateKey = candidateKeys.firstKey();
- byte [] searchKey = null;
- HStoreKey.StoreKeyComparator c =
- (HStoreKey.StoreKeyComparator)f.getReader().getComparator();
- if (c.compareRows(firstCandidateKey.getRow(), startKey.getRow()) < 0) {
- searchKey = startKey.getBytes();
+ KeyValue firstCandidateKey = candidates.first();
+ KeyValue search = null;
+ if (this.comparator.compareRows(firstCandidateKey, targetkey) < 0) {
+ search = targetkey;
} else {
- searchKey = new HStoreKey(firstCandidateKey.getRow()).getBytes();
+ search = firstCandidateKey;
}
// Seek to the exact row, or the one that would be immediately before it
HFileScanner scanner = f.getReader().getScanner();
- int result = scanner.seekTo(searchKey);
+ int result = scanner.seekTo(search.getBuffer(), search.getKeyOffset(),
+ search.getKeyLength());
if (result < 0) {
// Key is before start of this file. Return.
return;
}
do {
- HStoreKey k = HStoreKey.create(scanner.getKey());
- ByteBuffer v = scanner.getValue();
+ KeyValue kv = scanner.getKeyValue();
// if we have an exact match on row, and it's not a delete, save this
// as a candidate key
- if (HStoreKey.equalsTwoRowKeys(k.getRow(), row)) {
- handleKey(k, v, now, deletes, candidateKeys);
- } else if (this.rawcomparator.compareRows(k.getRow(), row) > 0 ) {
+ if (this.comparator.matchingRows(kv, targetkey)) {
+ handleKey(kv, now, deletes, candidates);
+ } else if (this.comparator.compareRows(kv, targetkey) > 0 ) {
// if the row key we just read is beyond the key we're searching for,
// then we're done.
break;
@@ -1576,109 +1539,61 @@
// So, the row key doesn't match, but we haven't gone past the row
// we're seeking yet, so this row is a candidate for closest
// (assuming that it isn't a delete).
- handleKey(k, v, now, deletes, candidateKeys);
+ handleKey(kv, now, deletes, candidates);
}
} while(scanner.next());
}
/*
+ * Used calculating keys at or just before a passed key.
* @param readkey
* @param now
- * @param deletes
- * @param candidateKeys
+ * @param deletes Set with Comparator that ignores key type.
+ * @param candidate Set with Comprator that ignores key type.
*/
- private void handleKey(final HStoreKey readkey, ByteBuffer value,
- final long now, final Set<HStoreKey> deletes,
- final SortedMap<HStoreKey, Long> candidateKeys) {
- if (!HLogEdit.isDeleted(value)) {
- handleNonDelete(readkey, now, deletes, candidateKeys);
+ private void handleKey(final KeyValue readkey, final long now,
+ final NavigableSet<KeyValue> deletes,
+ final NavigableSet<KeyValue> candidates) {
+ if (!readkey.isDeleteType()) {
+ handleNonDelete(readkey, now, deletes, candidates);
} else {
- // Pass copy because readkey will change next time next is called.
- handleDeleted(new HStoreKey(readkey), candidateKeys, deletes);
+ handleDeletes(readkey, candidates, deletes);
}
}
/*
+ * Used calculating keys at or just before a passed key.
* @param readkey
* @param now
- * @param deletes
- * @param candidateKeys
+ * @param deletes Set with Comparator that ignores key type.
+ * @param candidates Set with Comparator that ignores key type.
* @return True if we added a candidate.
*/
- private boolean handleNonDelete(final HStoreKey readkey, final long now,
- final Set<HStoreKey> deletes, final Map<HStoreKey, Long> candidateKeys) {
+ private boolean handleNonDelete(final KeyValue readkey, final long now,
+ final NavigableSet<KeyValue> deletes,
+ final NavigableSet<KeyValue> candidates) {
if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
- candidateKeys.put(stripTimestamp(readkey),
- Long.valueOf(readkey.getTimestamp()));
+ candidates.add(readkey);
return true;
}
return false;
}
- /* Handle keys whose values hold deletes.
+ /**
+ * Handle keys whose values hold deletes.
* Add to the set of deletes and then if the candidate keys contain any that
- * might match by timestamp, then check for a match and remove it if it's too
- * young to survive the delete
- * @param k Be careful; if key was gotten from a Mapfile, pass in a copy.
- * Values gotten by 'nexting' out of Mapfiles will change in each invocation.
- * @param candidateKeys
+ * might match, then check for a match and remove it. Implies candidates
+ * is made with a Comparator that ignores key type.
+ * @param k
+ * @param candidates
* @param deletes
+ * @return True if we removed <code>k</code> from <code>candidates</code>.
*/
- static void handleDeleted(final HStoreKey k,
- final SortedMap<HStoreKey, Long> candidateKeys,
- final Set<HStoreKey> deletes) {
+ static boolean handleDeletes(final KeyValue k,
+ final NavigableSet<KeyValue> candidates,
+ final NavigableSet<KeyValue> deletes) {
deletes.add(k);
- HStoreKey strippedKey = stripTimestamp(k);
- if (candidateKeys.containsKey(strippedKey)) {
- long bestCandidateTs =
- candidateKeys.get(strippedKey).longValue();
- if (bestCandidateTs <= k.getTimestamp()) {
- candidateKeys.remove(strippedKey);
- }
- }
- }
-
- static HStoreKey stripTimestamp(HStoreKey key) {
- return new HStoreKey(key.getRow(), key.getColumn());
- }
-
- /*
- * Test that the <i>target</i> matches the <i>origin</i> cell address. If the
- * <i>origin</i> has an empty column, then it's assumed to mean any column
- * matches and only match on row and timestamp. Otherwise, it compares the
- * keys with HStoreKey.matchesRowCol().
- * @param origin The key we're testing against
- * @param target The key we're testing
- */
- private boolean cellMatches(HStoreKey origin, HStoreKey target){
- // if the origin's column is empty, then we're matching any column
- if (Bytes.equals(origin.getColumn(), HConstants.EMPTY_BYTE_ARRAY)) {
- // if the row matches, then...
- if (HStoreKey.equalsTwoRowKeys(target.getRow(), origin.getRow())) {
- // check the timestamp
- return target.getTimestamp() <= origin.getTimestamp();
- }
- return false;
- }
- // otherwise, we want to match on row and column
- return target.matchesRowCol(origin);
- }
-
- /*
- * Test that the <i>target</i> matches the <i>origin</i>. If the <i>origin</i>
- * has an empty column, then it just tests row equivalence. Otherwise, it uses
- * HStoreKey.matchesRowCol().
- * @param origin Key we're testing against
- * @param target Key we're testing
- */
- private boolean rowMatches(final HStoreKey origin, final HStoreKey target){
- // if the origin's column is empty, then we're matching any column
- if (Bytes.equals(origin.getColumn(), HConstants.EMPTY_BYTE_ARRAY)) {
- // if the row matches, then...
- return HStoreKey.equalsTwoRowKeys(target.getRow(), origin.getRow());
- }
- // otherwise, we want to match on row and column
- return target.matchesRowCol(origin);
+ return candidates.remove(k);
}
/**
@@ -1701,18 +1616,18 @@
long maxSize = 0L;
Long mapIndex = Long.valueOf(0L);
for (Map.Entry<Long, StoreFile> e: storefiles.entrySet()) {
- StoreFile curHSF = e.getValue();
+ StoreFile sf = e.getValue();
if (splitable) {
- splitable = !curHSF.isReference();
+ splitable = !sf.isReference();
if (!splitable) {
// RETURN IN MIDDLE OF FUNCTION!!! If not splitable, just return.
if (LOG.isDebugEnabled()) {
- LOG.debug(curHSF + " is not splittable");
+ LOG.debug(sf + " is not splittable");
}
return null;
}
}
- long size = curHSF.getReader().length();
+ long size = sf.getReader().length();
if (size > maxSize) {
// This is the largest one so far
maxSize = size;
@@ -1726,13 +1641,15 @@
// the row we want to split on as midkey.
byte [] midkey = r.midkey();
if (midkey != null) {
- HStoreKey mk = HStoreKey.create(midkey);
- HStoreKey firstKey = HStoreKey.create(r.getFirstKey());
- HStoreKey lastKey = HStoreKey.create(r.getLastKey());
+ KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
+ byte [] fk = r.getFirstKey();
+ KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
+ byte [] lk = r.getLastKey();
+ KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
// if the midkey is the same as the first and last keys, then we cannot
// (ever) split this region.
- if (HStoreKey.equalsTwoRowKeys(mk.getRow(), firstKey.getRow()) &&
- HStoreKey.equalsTwoRowKeys( mk.getRow(), lastKey.getRow())) {
+ if (this.comparator.compareRows(mk, firstKey) == 0 &&
+ this.comparator.compareRows(mk, lastKey) == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("cannot split because midkey is the same as first or " +
"last row");
@@ -1761,7 +1678,8 @@
/**
* Return a scanner for both the memcache and the HStore files
*/
- protected InternalScanner getScanner(long timestamp, byte [][] targetCols,
+ protected InternalScanner getScanner(long timestamp,
+ final NavigableSet<byte []> targetCols,
byte [] firstRow, RowFilterInterface filter)
throws IOException {
lock.readLock().lock();
@@ -1798,22 +1716,23 @@
/*
* Datastructure that holds size and row to split a file around.
+ * TODO: Take a KeyValue rather than row.
*/
static class StoreSize {
private final long size;
- private final byte[] key;
- StoreSize(long size, byte[] key) {
+ private final byte [] row;
+
+ StoreSize(long size, byte [] row) {
this.size = size;
- this.key = new byte[key.length];
- System.arraycopy(key, 0, this.key, 0, key.length);
+ this.row = row;
}
/* @return the size */
long getSize() {
return size;
}
- /* @return the key */
- byte[] getSplitRow() {
- return key;
+
+ byte [] getSplitRow() {
+ return this.row;
}
}
@@ -1825,15 +1744,16 @@
* Convenience method that implements the old MapFile.getClosest on top of
* HFile Scanners. getClosest used seek to the asked-for key or just after
* (HFile seeks to the key or just before).
- * @param s
- * @param b
+ * @param s Scanner to use
+ * @param kv Key to find.
* @return True if we were able to seek the scanner to <code>b</code> or to
* the key just after.
* @throws IOException
*/
- static boolean getClosest(final HFileScanner s, final byte [] b)
+ static boolean getClosest(final HFileScanner s, final KeyValue kv)
throws IOException {
- int result = s.seekTo(b);
+ // Pass offsets to key content of a KeyValue; thats whats in the hfile index.
+ int result = s.seekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength());
if (result < 0) {
// Not in file. Will the first key do?
if (!s.seekTo()) {
@@ -1849,4 +1769,25 @@
}
return true;
}
+
+ /**
+ * @param kv
+ * @param columns Can be null
+ * @return True if column matches.
+ */
+ static boolean matchingColumns(final KeyValue kv, final Set<byte []> columns) {
+ if (columns == null) {
+ return true;
+ }
+ // Only instantiate columns if lots of columns to test.
+ if (columns.size() > 100) {
+ return columns.contains(kv.getColumn());
+ }
+ for (byte [] column: columns) {
+ if (kv.matchingColumn(column)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=764289&r1=764288&r2=764289&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Sun Apr 12 10:39:55 2009
@@ -32,7 +32,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.HalfHFileReader;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -254,29 +254,17 @@
@Override
protected String toStringFirstKey() {
- String result = "";
- try {
- result = HStoreKey.create(getFirstKey()).toString();
- } catch (IOException e) {
- LOG.warn("Failed toString first key", e);
- }
- return result;
+ return KeyValue.keyToString(getFirstKey());
}
@Override
protected String toStringLastKey() {
- String result = "";
- try {
- result = HStoreKey.create(getLastKey()).toString();
- } catch (IOException e) {
- LOG.warn("Failed toString last key", e);
- }
- return result;
+ return KeyValue.keyToString(getLastKey());
}
}
/**
- * Override to add some customization on HalfHFileReader
+ * Override to add some customization on HalfHFileReader.
*/
static class HalfStoreFileReader extends HalfHFileReader {
public HalfStoreFileReader(FileSystem fs, Path p, BlockCache c, Reference r)
@@ -291,24 +279,12 @@
@Override
protected String toStringFirstKey() {
- String result = "";
- try {
- result = HStoreKey.create(getFirstKey()).toString();
- } catch (IOException e) {
- LOG.warn("Failed toString first key", e);
- }
- return result;
+ return KeyValue.keyToString(getFirstKey());
}
@Override
protected String toStringLastKey() {
- String result = "";
- try {
- result = HStoreKey.create(getLastKey()).toString();
- } catch (IOException e) {
- LOG.warn("Failed toString last key", e);
- }
- return result;
+ return KeyValue.keyToString(getLastKey());
}
}
@@ -398,7 +374,7 @@
*/
public static HFile.Writer getWriter(final FileSystem fs, final Path dir,
final int blocksize, final Compression.Algorithm algorithm,
- final HStoreKey.StoreKeyComparator c, final boolean bloomfilter)
+ final KeyValue.KeyComparator c, final boolean bloomfilter)
throws IOException {
if (!fs.exists(dir)) {
fs.mkdirs(dir);
@@ -406,7 +382,7 @@
Path path = getUniqueFile(fs, dir);
return new HFile.Writer(fs, path, blocksize,
algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm,
- c == null? new HStoreKey.StoreKeyComparator(): c, bloomfilter);
+ c == null? KeyValue.KEY_COMPARATOR: c, bloomfilter);
}
/**
@@ -501,7 +477,7 @@
final StoreFile f, final byte [] splitRow, final Reference.Range range)
throws IOException {
// A reference to the bottom half of the hsf store file.
- Reference r = new Reference(new HStoreKey(splitRow).getBytes(), range);
+ Reference r = new Reference(splitRow, range);
// Add the referred-to regions name as a dot separated suffix.
// See REF_NAME_PARSER regex above. The referred-to regions name is
// up in the path of the passed in <code>f</code> -- parentdir is family,
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=764289&r1=764288&r2=764289&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Sun Apr 12 10:39:55 2009
@@ -21,18 +21,15 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.SortedMap;
+import java.util.NavigableSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HStoreKey;
-import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.util.Bytes;
/**
* A scanner that iterates through HStore files
@@ -40,9 +37,7 @@
class StoreFileScanner extends HAbstractScanner
implements ChangedReadersObserver {
// Keys retrieved from the sources
- private volatile HStoreKey keys[];
- // Values that correspond to those keys
- private ByteBuffer [] vals;
+ private volatile KeyValue keys[];
// Readers we go against.
private volatile HFileScanner [] scanners;
@@ -52,18 +47,21 @@
// Used around replacement of Readers if they change while we're scanning.
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
+
+ private final long now = System.currentTimeMillis();
+
/**
* @param store
* @param timestamp
- * @param targetCols
+ * @param columns
* @param firstRow
+ * @param deletes Set of running deletes
* @throws IOException
*/
public StoreFileScanner(final Store store, final long timestamp,
- final byte [][] targetCols, final byte [] firstRow)
+ final NavigableSet<byte []> columns, final byte [] firstRow)
throws IOException {
- super(timestamp, targetCols);
+ super(timestamp, columns);
this.store = store;
this.store.addChangedReaderObserver(this);
try {
@@ -75,7 +73,7 @@
throw e;
}
}
-
+
/*
* Go open new scanners and cue them at <code>firstRow</code>.
* Closes existing Readers if any.
@@ -90,12 +88,13 @@
s.add(f.getReader().getScanner());
}
this.scanners = s.toArray(new HFileScanner [] {});
- this.keys = new HStoreKey[this.scanners.length];
- this.vals = new ByteBuffer[this.scanners.length];
+ this.keys = new KeyValue[this.scanners.length];
// Advance the readers to the first pos.
+ KeyValue firstKey = (firstRow != null && firstRow.length > 0)?
+ new KeyValue(firstRow, HConstants.LATEST_TIMESTAMP): null;
for (int i = 0; i < this.scanners.length; i++) {
- if (firstRow != null && firstRow.length != 0) {
- if (findFirstRow(i, firstRow)) {
+ if (firstKey != null) {
+ if (seekTo(i, firstKey)) {
continue;
}
}
@@ -118,7 +117,7 @@
* @throws IOException
*/
boolean columnMatch(int i) throws IOException {
- return columnMatch(keys[i].getColumn());
+ return columnMatch(keys[i]);
}
/**
@@ -132,7 +131,7 @@
* @see org.apache.hadoop.hbase.regionserver.InternalScanner#next(org.apache.hadoop.hbase.HStoreKey, java.util.SortedMap)
*/
@Override
- public boolean next(HStoreKey key, SortedMap<byte [], Cell> results)
+ public boolean next(List<KeyValue> results)
throws IOException {
if (this.scannerClosed) {
return false;
@@ -140,84 +139,63 @@
this.lock.readLock().lock();
try {
// Find the next viable row label (and timestamp).
- ViableRow viableRow = getNextViableRow();
+ KeyValue viable = getNextViableRow();
+ if (viable == null) {
+ return false;
+ }
// Grab all the values that match this row/timestamp
- boolean insertedItem = false;
- if (viableRow.getRow() != null) {
- key.setRow(viableRow.getRow());
- key.setVersion(viableRow.getTimestamp());
- for (int i = 0; i < keys.length; i++) {
- // Fetch the data
- while ((keys[i] != null) &&
- (this.store.rawcomparator.compareRows(this.keys[i].getRow(),
- viableRow.getRow()) == 0)) {
- // If we are doing a wild card match or there are multiple matchers
- // per column, we need to scan all the older versions of this row
- // to pick up the rest of the family members
- if(!isWildcardScanner()
- && !isMultipleMatchScanner()
- && (keys[i].getTimestamp() != viableRow.getTimestamp())) {
- break;
- }
- if(columnMatch(i)) {
- // We only want the first result for any specific family member
- if(!results.containsKey(keys[i].getColumn())) {
- results.put(keys[i].getColumn(),
- new Cell(vals[i], keys[i].getTimestamp()));
- insertedItem = true;
- }
- }
-
- if (!getNext(i)) {
- closeSubScanner(i);
+ boolean addedItem = false;
+ for (int i = 0; i < keys.length; i++) {
+ // Fetch the data
+ while ((keys[i] != null) &&
+ (this.store.comparator.compareRows(this.keys[i], viable) == 0)) {
+ // If we are doing a wild card match or there are multiple matchers
+ // per column, we need to scan all the older versions of this row
+ // to pick up the rest of the family members
+ if(!isWildcardScanner()
+ && !isMultipleMatchScanner()
+ && (keys[i].getTimestamp() != viable.getTimestamp())) {
+ break;
+ }
+ if (columnMatch(i)) {
+ // We only want the first result for any specific family member
+ // TODO: Do we have to keep a running list of column entries in
+ // the results across all of the StoreScanner? Like we do
+ // doing getFull?
+ if (!results.contains(keys[i])) {
+ results.add(keys[i]);
+ addedItem = true;
}
}
- // Advance the current scanner beyond the chosen row, to
- // a valid timestamp, so we're ready next time.
- while ((keys[i] != null) &&
- ((this.store.rawcomparator.compareRows(this.keys[i].getRow(),
- viableRow.getRow()) <= 0) ||
- (keys[i].getTimestamp() > this.timestamp) ||
- (! columnMatch(i)))) {
- getNext(i);
+
+ if (!getNext(i)) {
+ closeSubScanner(i);
}
}
+ // Advance the current scanner beyond the chosen row, to
+ // a valid timestamp, so we're ready next time.
+ while ((keys[i] != null) &&
+ ((this.store.comparator.compareRows(this.keys[i], viable) <= 0) ||
+ (keys[i].getTimestamp() > this.timestamp) ||
+ !columnMatch(i))) {
+ getNext(i);
+ }
}
- return insertedItem;
+ return addedItem;
} finally {
this.lock.readLock().unlock();
}
}
- // Data stucture to hold next, viable row (and timestamp).
- static class ViableRow {
- private final byte [] row;
- private final long ts;
-
- ViableRow(final byte [] r, final long t) {
- this.row = r;
- this.ts = t;
- }
-
- byte [] getRow() {
- return this.row;
- }
-
- long getTimestamp() {
- return this.ts;
- }
- }
-
/*
* @return An instance of <code>ViableRow</code>
* @throws IOException
*/
- private ViableRow getNextViableRow() throws IOException {
+ private KeyValue getNextViableRow() throws IOException {
// Find the next viable row label (and timestamp).
- byte [] viableRow = null;
+ KeyValue viable = null;
long viableTimestamp = -1;
- long now = System.currentTimeMillis();
long ttl = store.ttl;
for (int i = 0; i < keys.length; i++) {
// The first key that we find that matches may have a timestamp greater
@@ -235,15 +213,12 @@
// If we get here and keys[i] is not null, we already know that the
// column matches and the timestamp of the row is less than or equal
// to this.timestamp, so we do not need to test that here
- && ((viableRow == null) ||
- (this.store.rawcomparator.compareRows(this.keys[i].getRow(),
- viableRow) < 0) ||
- ((this.store.rawcomparator.compareRows(this.keys[i].getRow(),
- viableRow) == 0) &&
+ && ((viable == null) ||
+ (this.store.comparator.compareRows(this.keys[i], viable) < 0) ||
+ ((this.store.comparator.compareRows(this.keys[i], viable) == 0) &&
(keys[i].getTimestamp() > viableTimestamp)))) {
if (ttl == HConstants.FOREVER || now < keys[i].getTimestamp() + ttl) {
- viableRow = keys[i].getRow();
- viableTimestamp = keys[i].getTimestamp();
+ viable = keys[i];
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("getNextViableRow :" + keys[i] + ": expired, skipped");
@@ -251,7 +226,7 @@
}
}
}
- return new ViableRow(viableRow, viableTimestamp);
+ return viable;
}
/*
@@ -260,30 +235,25 @@
*
* @param i which iterator to advance
* @param firstRow seek to this row
- * @return true if this is the first row or if the row was not found
+ * @return true if we found the first row and so the scanner is properly
+ * primed or true if the row was not found and this scanner is exhausted.
*/
- private boolean findFirstRow(int i, final byte [] firstRow) throws IOException {
- if (firstRow == null || firstRow.length <= 0) {
+ private boolean seekTo(int i, final KeyValue firstKey)
+ throws IOException {
+ if (firstKey == null) {
if (!this.scanners[i].seekTo()) {
closeSubScanner(i);
return true;
}
} else {
- if (!Store.getClosest(this.scanners[i], HStoreKey.getBytes(firstRow))) {
+ // TODO: sort columns and pass in column as part of key so we get closer.
+ if (!Store.getClosest(this.scanners[i], firstKey)) {
closeSubScanner(i);
return true;
}
}
- this.keys[i] = HStoreKey.create(this.scanners[i].getKey());
- this.vals[i] = this.scanners[i].getValue();
- long now = System.currentTimeMillis();
- long ttl = store.ttl;
- if (ttl != HConstants.FOREVER && now >= this.keys[i].getTimestamp() + ttl) {
- // Didn't find it. Close the scanner and return TRUE
- closeSubScanner(i);
- return true;
- }
- return columnMatch(i);
+ this.keys[i] = this.scanners[i].getKeyValue();
+ return isGoodKey(this.keys[i]);
}
/**
@@ -294,34 +264,33 @@
*/
private boolean getNext(int i) throws IOException {
boolean result = false;
- long now = System.currentTimeMillis();
- long ttl = store.ttl;
while (true) {
if ((this.scanners[i].isSeeked() && !this.scanners[i].next()) ||
(!this.scanners[i].isSeeked() && !this.scanners[i].seekTo())) {
closeSubScanner(i);
break;
}
- this.keys[i] = HStoreKey.create(this.scanners[i].getKey());
- if (keys[i].getTimestamp() <= this.timestamp) {
- if (ttl == HConstants.FOREVER || now < keys[i].getTimestamp() + ttl) {
- vals[i] = this.scanners[i].getValue();
+ this.keys[i] = this.scanners[i].getKeyValue();
+ if (isGoodKey(this.keys[i])) {
result = true;
break;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("getNext: " + keys[i] + ": expired, skipped");
- }
}
}
return result;
}
+ /*
+ * @param kv
+ * @return True if good key candidate.
+ */
+ private boolean isGoodKey(final KeyValue kv) {
+ return !Store.isExpired(kv, this.store.ttl, this.now);
+ }
+
/** Close down the indicated reader. */
private void closeSubScanner(int i) {
this.scanners[i] = null;
this.keys[i] = null;
- this.vals[i] = null;
}
/** Shut it down! */
@@ -346,11 +315,10 @@
// The keys are currently lined up at the next row to fetch. Pass in
// the current row as 'first' row and readers will be opened and cue'd
// up so future call to next will start here.
- ViableRow viableRow = getNextViableRow();
- openScanner(viableRow.getRow());
+ KeyValue viable = getNextViableRow();
+ openScanner(viable.getRow());
LOG.debug("Replaced Scanner Readers at row " +
- (viableRow == null || viableRow.getRow() == null? "null":
- Bytes.toString(viableRow.getRow())));
+ viable.getRow().toString());
} finally {
this.lock.writeLock().unlock();
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=764289&r1=764288&r2=764289&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Sun Apr 12 10:39:55 2009
@@ -21,20 +21,18 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
+import java.util.NavigableSet;
+import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -43,15 +41,14 @@
class StoreScanner implements InternalScanner, ChangedReadersObserver {
static final Log LOG = LogFactory.getLog(StoreScanner.class);
- private InternalScanner[] scanners;
- private TreeMap<byte [], Cell>[] resultSets;
- private HStoreKey[] keys;
+ private InternalScanner [] scanners;
+ private List<KeyValue> [] resultSets;
private boolean wildcardMatch = false;
private boolean multipleMatchers = false;
private RowFilterInterface dataFilter;
private Store store;
private final long timestamp;
- private final byte [][] targetCols;
+ private final NavigableSet<byte []> columns;
// Indices for memcache scanner and hstorefile scanner.
private static final int MEMS_INDEX = 0;
@@ -62,11 +59,11 @@
// Used to indicate that the scanner has closed (see HBASE-1107)
private final AtomicBoolean closing = new AtomicBoolean(false);
-
+
/** Create an Scanner with a handle on the memcache and HStore files. */
@SuppressWarnings("unchecked")
- StoreScanner(Store store, byte [][] targetCols, byte [] firstRow,
- long timestamp, RowFilterInterface filter)
+ StoreScanner(Store store, final NavigableSet<byte []> targetCols,
+ byte [] firstRow, long timestamp, RowFilterInterface filter)
throws IOException {
this.store = store;
this.dataFilter = filter;
@@ -74,12 +71,11 @@
dataFilter.reset();
}
this.scanners = new InternalScanner[2];
- this.resultSets = new TreeMap[scanners.length];
- this.keys = new HStoreKey[scanners.length];
+ this.resultSets = new List[scanners.length];
// Save these args in case we need them later handling change in readers
// See updateReaders below.
this.timestamp = timestamp;
- this.targetCols = targetCols;
+ this.columns = targetCols;
try {
scanners[MEMS_INDEX] =
store.memcache.getScanner(timestamp, targetCols, firstRow);
@@ -98,7 +94,6 @@
for (int i = MEMS_INDEX; i < scanners.length; i++) {
setupScanner(i);
}
-
this.store.addChangedReaderObserver(this);
}
@@ -120,10 +115,8 @@
* @throws IOException
*/
private void setupScanner(final int i) throws IOException {
- this.keys[i] = new HStoreKey();
- this.resultSets[i] = new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
- if (this.scanners[i] != null && !this.scanners[i].next(this.keys[i],
- this.resultSets[i])) {
+ this.resultSets[i] = new ArrayList<KeyValue>();
+ if (this.scanners[i] != null && !this.scanners[i].next(this.resultSets[i])) {
closeScanner(i);
}
}
@@ -138,7 +131,7 @@
return this.multipleMatchers;
}
- public boolean next(HStoreKey key, SortedMap<byte [], Cell> results)
+ public boolean next(List<KeyValue> results)
throws IOException {
this.lock.readLock().lock();
try {
@@ -148,100 +141,82 @@
boolean moreToFollow = true;
while (filtered && moreToFollow) {
// Find the lowest-possible key.
- byte [] chosenRow = null;
+ KeyValue chosen = null;
long chosenTimestamp = -1;
- for (int i = 0; i < this.keys.length; i++) {
+ for (int i = 0; i < this.scanners.length; i++) {
+ KeyValue kv = this.resultSets[i] == null || this.resultSets[i].isEmpty()?
+ null: this.resultSets[i].get(0);
+ if (kv == null) {
+ continue;
+ }
if (scanners[i] != null &&
- (chosenRow == null ||
- (this.store.rawcomparator.compareRows(this.keys[i].getRow(),
- chosenRow) < 0) ||
- ((this.store.rawcomparator.compareRows(this.keys[i].getRow(),
- chosenRow) == 0) &&
- (keys[i].getTimestamp() > chosenTimestamp)))) {
- chosenRow = keys[i].getRow();
- chosenTimestamp = keys[i].getTimestamp();
+ (chosen == null ||
+ (this.store.comparator.compareRows(kv, chosen) < 0) ||
+ ((this.store.comparator.compareRows(kv, chosen) == 0) &&
+ (kv.getTimestamp() > chosenTimestamp)))) {
+ chosen = kv;
+ chosenTimestamp = chosen.getTimestamp();
}
}
-
+
// Filter whole row by row key?
- filtered = dataFilter != null? dataFilter.filterRowKey(chosenRow) : false;
+ filtered = dataFilter == null || chosen == null? false:
+ dataFilter.filterRowKey(chosen.getBuffer(), chosen.getRowOffset(),
+ chosen.getRowLength());
- // Store the key and results for each sub-scanner. Merge them as
- // appropriate.
+ // Store results for each sub-scanner.
if (chosenTimestamp >= 0 && !filtered) {
- // Here we are setting the passed in key with current row+timestamp
- key.setRow(chosenRow);
- key.setVersion(chosenTimestamp);
- key.setColumn(HConstants.EMPTY_BYTE_ARRAY);
- // Keep list of deleted cell keys within this row. We need this
- // because as we go through scanners, the delete record may be in an
- // early scanner and then the same record with a non-delete, non-null
- // value in a later. Without history of what we've seen, we'll return
- // deleted values. This List should not ever grow too large since we
- // are only keeping rows and columns that match those set on the
- // scanner and which have delete values. If memory usage becomes a
- // problem, could redo as bloom filter.
- Set<HStoreKey> deletes = new HashSet<HStoreKey>();
+ NavigableSet<KeyValue> deletes =
+ new TreeSet<KeyValue>(this.store.comparatorIgnoringType);
for (int i = 0; i < scanners.length && !filtered; i++) {
- while ((scanners[i] != null && !filtered && moreToFollow) &&
- (this.store.rawcomparator.compareRows(this.keys[i].getRow(),
- chosenRow) == 0)) {
- // If we are doing a wild card match or there are multiple
- // matchers per column, we need to scan all the older versions of
- // this row to pick up the rest of the family members
- if (!wildcardMatch
- && !multipleMatchers
- && (keys[i].getTimestamp() != chosenTimestamp)) {
- break;
+ if ((scanners[i] != null && !filtered && moreToFollow &&
+ this.resultSets[i] != null && !this.resultSets[i].isEmpty())) {
+ // Test this resultset is for the 'chosen' row.
+ KeyValue firstkv = resultSets[i].get(0);
+ if (!this.store.comparator.matchingRows(firstkv, chosen)) {
+ continue;
}
-
- // NOTE: We used to do results.putAll(resultSets[i]);
- // but this had the effect of overwriting newer
- // values with older ones. So now we only insert
- // a result if the map does not contain the key.
- HStoreKey hsk = new HStoreKey(key.getRow(),
- HConstants.EMPTY_BYTE_ARRAY,
- key.getTimestamp());
- for (Map.Entry<byte [], Cell> e : resultSets[i].entrySet()) {
- hsk.setColumn(e.getKey());
- if (HLogEdit.isDeleted(e.getValue().getValue())) {
- // Only first key encountered is added; deletes is a Set.
- deletes.add(new HStoreKey(hsk));
- } else if ((deletes.size() == 0 || !deletes.contains(hsk)) &&
- !filtered &&
- moreToFollow &&
- !results.containsKey(e.getKey())) {
- if (dataFilter != null) {
+ // Its for the 'chosen' row, work it.
+ for (KeyValue kv: resultSets[i]) {
+ if (kv.isDeleteType()) {
+ deletes.add(kv);
+ } else if ((deletes.isEmpty() || !deletes.contains(kv)) &&
+ !filtered && moreToFollow && !results.contains(kv)) {
+ if (this.dataFilter != null) {
// Filter whole row by column data?
- filtered = dataFilter.filterColumn(chosenRow, e.getKey(),
- e.getValue().getValue());
+ int rowlength = kv.getRowLength();
+ int columnoffset = kv.getColumnOffset(rowlength);
+ filtered = dataFilter.filterColumn(kv.getBuffer(),
+ kv.getRowOffset(), rowlength,
+ kv.getBuffer(), columnoffset, kv.getColumnLength(columnoffset),
+ kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
if (filtered) {
results.clear();
break;
}
}
- results.put(e.getKey(), e.getValue());
+ results.add(kv);
+ /* REMOVING BECAUSE COULD BE BUNCH OF DELETES IN RESULTS
+ AND WE WANT TO INCLUDE THEM -- below short-circuit is
+ probably not wanted.
+ // If we are doing a wild card match or there are multiple
+ // matchers per column, we need to scan all the older versions of
+ // this row to pick up the rest of the family members
+ if (!wildcardMatch && !multipleMatchers &&
+ (kv.getTimestamp() != chosenTimestamp)) {
+ break;
+ }
+ */
}
}
+ // Move on to next row.
resultSets[i].clear();
- if (!scanners[i].next(keys[i], resultSets[i])) {
+ if (!scanners[i].next(resultSets[i])) {
closeScanner(i);
}
}
}
}
- for (int i = 0; i < scanners.length; i++) {
- // If the current scanner is non-null AND has a lower-or-equal
- // row label, then its timestamp is bad. We need to advance it.
- while ((scanners[i] != null) &&
- (this.store.rawcomparator.compareRows(this.keys[i].getRow(),
- chosenRow) <= 0)) {
- resultSets[i].clear();
- if (!scanners[i].next(keys[i], resultSets[i])) {
- closeScanner(i);
- }
- }
- }
moreToFollow = chosenTimestamp >= 0;
if (dataFilter != null) {
@@ -249,8 +224,8 @@
moreToFollow = false;
}
}
-
- if (results.size() <= 0 && !filtered) {
+
+ if (results.isEmpty() && !filtered) {
// There were no results found for this row. Marked it as
// 'filtered'-out otherwise we will not move on to the next row.
filtered = true;
@@ -258,7 +233,7 @@
}
// If we got no results, then there is no more to follow.
- if (results == null || results.size() <= 0) {
+ if (results == null || results.isEmpty()) {
moreToFollow = false;
}
@@ -276,18 +251,18 @@
this.lock.readLock().unlock();
}
}
-
+
/** Shut down a single scanner */
void closeScanner(int i) {
try {
try {
scanners[i].close();
} catch (IOException e) {
- LOG.warn(Bytes.toString(store.storeName) + " failed closing scanner " + i, e);
+ LOG.warn(Bytes.toString(store.storeName) + " failed closing scanner " +
+ i, e);
}
} finally {
scanners[i] = null;
- keys[i] = null;
resultSets[i] = null;
}
}
@@ -321,8 +296,9 @@
try {
// I think its safe getting key from mem at this stage -- it shouldn't have
// been flushed yet
+ // TODO: MAKE SURE WE UPDATE FROM TRUNNK.
this.scanners[HSFS_INDEX] = new StoreFileScanner(this.store,
- this.timestamp, this. targetCols, this.keys[MEMS_INDEX].getRow());
+ this.timestamp, this. columns, this.resultSets[MEMS_INDEX].get(0).getRow());
checkScannerFlags(HSFS_INDEX);
setupScanner(HSFS_INDEX);
LOG.debug("Added a StoreFileScanner to outstanding HStoreScanner");