You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/04/12 12:39:58 UTC
svn commit: r764289 [4/8] - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/filter/ src/java/org/apache/hadoop/hbase/io/
src/java/org/apache/hadoop/hbase/io/hfile/ s...
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java?rev=764289&r1=764288&r2=764289&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/Memcache.java Sun Apr 12 10:39:55 2009
@@ -25,122 +25,135 @@
import java.lang.management.RuntimeMXBean;
import java.rmi.UnexpectedException;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedMap;
+import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HStoreKey;
-import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.HRegion.Counter;
import org.apache.hadoop.hbase.util.Bytes;
-
/**
- * The Memcache holds in-memory modifications to the HRegion.
- * Keeps a current map. When asked to flush the map, current map is moved
- * to snapshot and is cleared. We continue to serve edits out of new map
+ * The Memcache holds in-memory modifications to the HRegion. Modifications
+ * are {@link KeyValue}s. When asked to flush, current memcache is moved
+ * to snapshot and is cleared. We continue to serve edits out of new memcache
* and backing snapshot until flusher reports in that the flush succeeded. At
* this point we let the snapshot go.
+ * TODO: Adjust size of the memcache when we remove items because they have
+ * been deleted.
*/
class Memcache {
private static final Log LOG = LogFactory.getLog(Memcache.class);
-
+
private final long ttl;
// Note that since these structures are always accessed with a lock held,
- // so no additional synchronization is required.
+ // no additional synchronization is required.
- // The currently active sorted map of edits.
- private volatile SortedMap<HStoreKey, byte[]> memcache;
+ // The currently active sorted set of edits. Using explicit type because
+ // if I use NavigableSet, I lose some facility -- I can't get a NavigableSet
+ // when I do tailSet or headSet.
+ volatile ConcurrentSkipListSet<KeyValue> memcache;
// Snapshot of memcache. Made for flusher.
- private volatile SortedMap<HStoreKey, byte[]> snapshot;
+ volatile ConcurrentSkipListSet<KeyValue> snapshot;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private final Comparator<HStoreKey> comparator;
- private final HStoreKey.StoreKeyComparator rawcomparator;
+ final KeyValue.KVComparator comparator;
+
+ // Used comparing versions -- same r/c and ts but different type.
+ final KeyValue.KVComparator comparatorIgnoreType;
+
+ // Used comparing versions -- same r/c and type but different timestamp.
+ final KeyValue.KVComparator comparatorIgnoreTimestamp;
+
+ // TODO: Fix this guess by studying jprofiler
+ private final static int ESTIMATED_KV_HEAP_TAX = 60;
/**
* Default constructor. Used for tests.
*/
- @SuppressWarnings("unchecked")
public Memcache() {
- this(HConstants.FOREVER, new HStoreKey.HStoreKeyComparator(),
- new HStoreKey.StoreKeyComparator());
+ this(HConstants.FOREVER, KeyValue.COMPARATOR);
}
/**
* Constructor.
* @param ttl The TTL for cache entries, in milliseconds.
- * @param c
- * @param rc
+ * @param c
*/
- public Memcache(final long ttl, final Comparator<HStoreKey> c,
- final HStoreKey.StoreKeyComparator rc) {
+ public Memcache(final long ttl, final KeyValue.KVComparator c) {
this.ttl = ttl;
this.comparator = c;
- this.rawcomparator = rc;
- this.memcache = createSynchronizedSortedMap(c);
- this.snapshot = createSynchronizedSortedMap(c);
+ this.comparatorIgnoreTimestamp =
+ this.comparator.getComparatorIgnoringTimestamps();
+ this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType();
+ this.memcache = createSet(c);
+ this.snapshot = createSet(c);
}
- /*
- * Utility method using HSKWritableComparator
- * @return synchronized sorted map of HStoreKey to byte arrays.
- */
- private SortedMap<HStoreKey, byte[]> createSynchronizedSortedMap(final Comparator<HStoreKey> c) {
- return Collections.synchronizedSortedMap(new TreeMap<HStoreKey, byte []>(c));
+ static ConcurrentSkipListSet<KeyValue> createSet(final KeyValue.KVComparator c) {
+ return new ConcurrentSkipListSet<KeyValue>(c);
+ }
+
+ void dump() {
+ for (KeyValue kv: this.memcache) {
+ LOG.info(kv);
+ }
+ for (KeyValue kv: this.snapshot) {
+ LOG.info(kv);
+ }
}
/**
* Creates a snapshot of the current Memcache.
* Snapshot must be cleared by call to {@link #clearSnapshot(SortedMap)}
- * To get the snapshot made by this method, use
- * {@link #getSnapshot}.
+ * To get the snapshot made by this method, use {@link #getSnapshot}.
*/
void snapshot() {
this.lock.writeLock().lock();
try {
// If snapshot currently has entries, then flusher failed or didn't call
// cleanup. Log a warning.
- if (this.snapshot.size() > 0) {
- LOG.debug("Snapshot called again without clearing previous. " +
+ if (!this.snapshot.isEmpty()) {
+ LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?");
} else {
// We used to synchronize on the memcache here but we're inside a
// write lock so removed it. Comment is left in case removal was a
// mistake. St.Ack
- if (this.memcache.size() != 0) {
+ if (!this.memcache.isEmpty()) {
this.snapshot = this.memcache;
- this.memcache = createSynchronizedSortedMap(this.comparator);
+ this.memcache = createSet(this.comparator);
}
}
} finally {
this.lock.writeLock().unlock();
}
}
-
+
/**
* Return the current snapshot.
* Called by flusher to get current snapshot made by a previous
* call to {@link snapshot}.
* @return Return snapshot.
* @see {@link #snapshot()}
- * @see {@link #clearSnapshot(SortedMap)}
+ * @see {@link #clearSnapshot(NavigableSet)}
*/
- SortedMap<HStoreKey, byte[]> getSnapshot() {
+ ConcurrentSkipListSet<KeyValue> getSnapshot() {
return this.snapshot;
}
@@ -150,7 +163,7 @@
* @throws UnexpectedException
* @see {@link #snapshot()}
*/
- void clearSnapshot(final SortedMap<HStoreKey, byte []> ss)
+ void clearSnapshot(final Set<KeyValue> ss)
throws UnexpectedException {
this.lock.writeLock().lock();
try {
@@ -160,8 +173,8 @@
}
// OK. Passed in snapshot is same as current snapshot. If not-empty,
// create a new snapshot and let the old one go.
- if (ss.size() != 0) {
- this.snapshot = createSynchronizedSortedMap(this.comparator);
+ if (!ss.isEmpty()) {
+ this.snapshot = createSet(this.comparator);
}
} finally {
this.lock.writeLock().unlock();
@@ -170,143 +183,121 @@
/**
* Write an update
- * @param key
- * @param value
- * @return memcache Approximate size of the passed key and value. Includes
- * cost of hosting HSK and byte arrays as well as the Map.Entry this addition
- * costs when we insert into the backing TreeMap.
+ * @param kv
+ * @return approximate size of the passed key and value.
*/
- long add(final HStoreKey key, final byte[] value) {
+ long add(final KeyValue kv) {
long size = -1;
this.lock.readLock().lock();
try {
- byte [] oldValue = this.memcache.remove(key);
- this.memcache.put(key, value);
- size = heapSize(key, value, oldValue);
+ boolean notpresent = this.memcache.add(kv);
+ size = heapSize(kv, notpresent);
} finally {
this.lock.readLock().unlock();
}
return size;
}
-
+
/*
- * Calcuate how the memcache size has changed, approximately.
- * Add in tax of TreeMap.Entry.
- * @param key
- * @param value
- * @param oldValue
- * @return
- */
- long heapSize(final HStoreKey key, final byte [] value,
- final byte [] oldValue) {
- // First add value length.
- long keySize = key.heapSize();
- // Add value.
- long size = value == null? 0: value.length;
- if (oldValue == null) {
- size += keySize;
+ * Calculate how the memcache size has changed, approximately. Be careful.
+ * If class changes, be sure to change the size calculation.
+ * Add in tax of Map.Entry.
+ * @param kv
+ * @param notpresent True if the kv was NOT present in the set.
+ * @return Size
+ */
+ long heapSize(final KeyValue kv, final boolean notpresent) {
+ return notpresent?
// Add overhead for value byte array and for Map.Entry -- 57 bytes
// on x64 according to jprofiler.
- size += Bytes.ESTIMATED_HEAP_TAX + 57;
- } else {
- // If old value, don't add overhead again nor key size. Just add
- // difference in value sizes.
- size -= oldValue.length;
- }
- return size;
+ ESTIMATED_KV_HEAP_TAX + 57 + kv.getLength(): 0; // Guess no change in size.
}
/**
* Look back through all the backlog TreeMaps to find the target.
- * @param key
+ * @param kv
* @param numVersions
- * @return An array of byte arrays ordered by timestamp.
+ * @return Set of KeyValues. Empty size not null if no results.
*/
- List<Cell> get(final HStoreKey key, final int numVersions) {
- return get(key, numVersions, null, System.currentTimeMillis());
+ List<KeyValue> get(final KeyValue kv, final int numVersions) {
+ List<KeyValue> results = new ArrayList<KeyValue>();
+ get(kv, numVersions, results,
+ new TreeSet<KeyValue>(this.comparatorIgnoreType),
+ System.currentTimeMillis());
+ return results;
}
-
+
/**
* Look back through all the backlog TreeMaps to find the target.
* @param key
- * @param numVersions
- * @param deletes
+ * @param versions
+ * @param results
+ * @param deletes Pass a Set that has a Comparator that ignores key type.
* @param now
- * @return An array of byte arrays ordered by timestamp.
+ * @return True if enough versions.
*/
- List<Cell> get(final HStoreKey key, final int numVersions,
- final Set<HStoreKey> deletes, final long now) {
+ boolean get(final KeyValue key, final int versions,
+ List<KeyValue> results, final NavigableSet<KeyValue> deletes,
+ final long now) {
this.lock.readLock().lock();
try {
- List<Cell> results;
- // The synchronizations here are because the below get iterates
- synchronized (this.memcache) {
- results = get(this.memcache, key, numVersions, deletes, now);
- }
- synchronized (this.snapshot) {
- results.addAll(results.size(),
- get(this.snapshot, key, numVersions - results.size(), deletes, now));
+ if (get(this.memcache, key, versions, results, deletes, now)) {
+ return true;
}
- return results;
+ return get(this.snapshot, key, versions , results, deletes, now);
} finally {
this.lock.readLock().unlock();
}
}
-
+
/**
+ * @param kv Find the row that comes after this one. If null, we return the
+ * first.
+ * @return Next row or null if none found.
+ */
+ KeyValue getNextRow(final KeyValue kv) {
+ this.lock.readLock().lock();
+ try {
+ return getLowest(getNextRow(kv, this.memcache),
+ getNextRow(kv, this.snapshot));
+ } finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ /*
* @param a
* @param b
* @return Return lowest of a or b or null if both a and b are null
*/
- private byte [] getLowest(final byte [] a,
- final byte [] b) {
+ private KeyValue getLowest(final KeyValue a, final KeyValue b) {
if (a == null) {
return b;
}
if (b == null) {
return a;
}
- return this.rawcomparator.compareRows(a, b) <= 0? a: b;
+ return comparator.compareRows(a, b) <= 0? a: b;
}
- /**
- * @param row Find the row that comes after this one.
- * @return Next row or null if none found
- */
- byte [] getNextRow(final byte [] row) {
- this.lock.readLock().lock();
- try {
- return getLowest(getNextRow(row, this.memcache),
- getNextRow(row, this.snapshot));
- } finally {
- this.lock.readLock().unlock();
- }
- }
-
/*
- * @param row Find row that follows this one.
- * @param map Map to look in for a row beyond <code>row</code>.
- * This method synchronizes on passed map while iterating it.
- * @return Next row or null if none found.
- */
- private byte [] getNextRow(final byte [] row,
- final SortedMap<HStoreKey, byte []> map) {
- byte [] result = null;
- // Synchronize on the map to make the tailMap making 'safe'.
- synchronized (map) {
- // Make an HSK with maximum timestamp so we get past most of the current
- // rows cell entries.
- HStoreKey hsk = new HStoreKey(row, HConstants.LATEST_TIMESTAMP);
- SortedMap<HStoreKey, byte []> tailMap = map.tailMap(hsk);
- // Iterate until we fall into the next row; i.e. move off current row
- for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
- HStoreKey itKey = es.getKey();
- if (this.rawcomparator.compareRows(itKey.getRow(), row) <= 0)
- continue;
- // Note: Not suppressing deletes or expired cells.
- result = itKey.getRow();
- break;
- }
+ * @param kv Find row that follows this one. If null, return first.
+ * @param set Set to look in for a row beyond <code>row</code>.
+ * @return Next row or null if none found. If one found, will be a new
+ * KeyValue -- can be destroyed by subsequent calls to this method.
+ */
+ private KeyValue getNextRow(final KeyValue kv,
+ final NavigableSet<KeyValue> set) {
+ KeyValue result = null;
+ SortedSet<KeyValue> tailset = kv == null? set: set.tailSet(kv);
+ // Iterate until we fall into the next row; i.e. move off current row
+ for (KeyValue i : tailset) {
+ if (comparator.compareRows(i, kv) <= 0)
+ continue;
+ // Note: Not suppressing deletes or expired cells. Needs to be handled
+ // by higher up functions.
+ result = i;
+ break;
}
return result;
}
@@ -314,225 +305,223 @@
/**
* Return all the available columns for the given key. The key indicates a
* row and timestamp, but not a column name.
- * @param key
+ * @param origin Where to start searching. Specifies a row and timestamp.
+ * Columns are specified in following arguments.
* @param columns Pass null for all columns else the wanted subset.
+ * @param columnPattern Column pattern to match.
* @param numVersions number of versions to retrieve
- * @param deletes Map to accumulate deletes found.
+ * @param versionsCount Map of KV to Count. Uses a Comparator that doesn't
+ * look at timestamps so only Row/Column are compared.
+ * @param deletes Pass a Set that has a Comparator that ignores key type.
* @param results Where to stick row results found.
+ * @return True if we found enough results for passed <code>columns</code>
+ * and <code>numVersions</code>.
*/
- void getFull(HStoreKey key, Set<byte []> columns, int numVersions,
- Map<byte [], Long> deletes, Map<byte [], Cell> results) {
+ boolean getFull(final KeyValue key, NavigableSet<byte []> columns,
+ final Pattern columnPattern,
+ int numVersions, final Map<KeyValue, HRegion.Counter> versionsCount,
+ final NavigableSet<KeyValue> deletes,
+ final List<KeyValue> results, final long now) {
this.lock.readLock().lock();
try {
- // The synchronizations here are because internalGet iterates
- synchronized (this.memcache) {
- internalGetFull(this.memcache, key, columns, numVersions, deletes, results);
- }
- synchronized (this.snapshot) {
- internalGetFull(this.snapshot, key, columns, numVersions, deletes, results);
+ // Used to be synchronized but now with weak iteration, no longer needed.
+ if (getFull(this.memcache, key, columns, columnPattern, numVersions,
+ versionsCount, deletes, results, now)) {
+ // Has enough results.
+ return true;
}
+ return getFull(this.snapshot, key, columns, columnPattern, numVersions,
+ versionsCount, deletes, results, now);
} finally {
this.lock.readLock().unlock();
}
}
- private void internalGetFull(SortedMap<HStoreKey, byte[]> map, HStoreKey key,
- Set<byte []> columns, int numVersions, Map<byte [], Long> deletes,
- Map<byte [], Cell> results) {
- if (map.isEmpty() || key == null) {
- return;
- }
- List<HStoreKey> victims = new ArrayList<HStoreKey>();
- SortedMap<HStoreKey, byte[]> tailMap = map.tailMap(key);
- long now = System.currentTimeMillis();
- for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
- HStoreKey itKey = es.getKey();
- byte [] itCol = itKey.getColumn();
- Cell cell = results.get(itCol);
- if ((cell == null || cell.getNumValues() < numVersions) &&
- key.matchesWithoutColumn(itKey)) {
- if (columns == null || columns.contains(itKey.getColumn())) {
- byte [] val = tailMap.get(itKey);
- if (HLogEdit.isDeleted(val)) {
- if (!deletes.containsKey(itCol)
- || deletes.get(itCol).longValue() < itKey.getTimestamp()) {
- deletes.put(itCol, Long.valueOf(itKey.getTimestamp()));
- }
- } else if (!(deletes.containsKey(itCol)
- && deletes.get(itCol).longValue() >= itKey.getTimestamp())) {
- // Skip expired cells
- if (ttl == HConstants.FOREVER ||
- now < itKey.getTimestamp() + ttl) {
- if (cell == null) {
- results.put(itCol, new Cell(val, itKey.getTimestamp()));
- } else {
- cell.add(val, itKey.getTimestamp());
- }
- } else {
- addVictim(victims, itKey);
- }
- }
- }
- } else if (this.rawcomparator.compareRows(key.getRow(), itKey.getRow()) < 0) {
+ /*
+ * @param set
+ * @param target Where to start searching.
+ * @param columns
+ * @param versions
+ * @param versionCounter
+ * @param deletes Pass a Set that has a Comparator that ignores key type.
+ * @param keyvalues
+ * @return True if enough results found.
+ */
+ private boolean getFull(final ConcurrentSkipListSet<KeyValue> set,
+ final KeyValue target, final Set<byte []> columns,
+ final Pattern columnPattern,
+ final int versions, final Map<KeyValue, HRegion.Counter> versionCounter,
+ final NavigableSet<KeyValue> deletes, List<KeyValue> keyvalues,
+ final long now) {
+ boolean hasEnough = false;
+ if (target == null) {
+ return hasEnough;
+ }
+ NavigableSet<KeyValue> tailset = set.tailSet(target);
+ if (tailset == null || tailset.isEmpty()) {
+ return hasEnough;
+ }
+ // TODO: This loop same as in HStore.getFullFromStoreFile. Make sure they
+ // are the same.
+ for (KeyValue kv: tailset) {
+ // Make sure we have not passed out the row. If target key has a
+ // column on it, then we are looking explicit key+column combination. If
+ // we've passed it out, also break.
+ if (target.isEmptyColumn()? !this.comparator.matchingRows(target, kv):
+ !this.comparator.matchingRowColumn(target, kv)) {
+ break;
+ }
+ if (!Store.getFullCheck(this.comparator, target, kv, columns, columnPattern)) {
+ continue;
+ }
+ if (Store.doKeyValue(kv, versions, versionCounter, columns, deletes, now,
+ this.ttl, keyvalues, tailset)) {
+ hasEnough = true;
break;
}
}
- // Remove expired victims from the map.
- for (HStoreKey v: victims) {
- map.remove(v);
- }
+ return hasEnough;
}
/**
* @param row Row to look for.
* @param candidateKeys Map of candidate keys (Accumulation over lots of
* lookup over stores and memcaches)
- * @param deletes Deletes collected so far.
*/
- void getRowKeyAtOrBefore(final byte [] row,
- final SortedMap<HStoreKey, Long> candidateKeys) {
- getRowKeyAtOrBefore(row, candidateKeys, new HashSet<HStoreKey>());
+ void getRowKeyAtOrBefore(final KeyValue row,
+ final NavigableSet<KeyValue> candidateKeys) {
+ getRowKeyAtOrBefore(row, candidateKeys,
+ new TreeSet<KeyValue>(this.comparator), System.currentTimeMillis());
}
-
+
/**
- * @param row Row to look for.
- * @param candidateKeys Map of candidate keys (Accumulation over lots of
- * lookup over stores and memcaches)
- * @param deletes Deletes collected so far.
- */
- void getRowKeyAtOrBefore(final byte [] row,
- final SortedMap<HStoreKey, Long> candidateKeys,
- final Set<HStoreKey> deletes) {
+ * @param kv Row to look for.
+ * @param candidates Map of candidate keys (Accumulation over lots of
+ * lookup over stores and memcaches). Pass a Set with a Comparator that
+ * ignores key Type so we can do Set.remove using a delete, i.e. a KeyValue
+ * with a different Type to the candidate key.
+ * @param deletes Pass a Set that has a Comparator that ignores key type.
+ */
+ void getRowKeyAtOrBefore(final KeyValue kv,
+ final NavigableSet<KeyValue> candidates,
+ final NavigableSet<KeyValue> deletes, final long now) {
this.lock.readLock().lock();
try {
- synchronized (memcache) {
- getRowKeyAtOrBefore(memcache, row, candidateKeys, deletes);
- }
- synchronized (snapshot) {
- getRowKeyAtOrBefore(snapshot, row, candidateKeys, deletes);
- }
+ getRowKeyAtOrBefore(memcache, kv, candidates, deletes, now);
+ getRowKeyAtOrBefore(snapshot, kv, candidates, deletes, now);
} finally {
this.lock.readLock().unlock();
}
}
- private void getRowKeyAtOrBefore(final SortedMap<HStoreKey, byte []> map,
- final byte [] row, final SortedMap<HStoreKey, Long> candidateKeys,
- final Set<HStoreKey> deletes) {
+ private void getRowKeyAtOrBefore(final ConcurrentSkipListSet<KeyValue> set,
+ final KeyValue kv, final NavigableSet<KeyValue> candidates,
+ final NavigableSet<KeyValue> deletes, final long now) {
+ if (set.isEmpty()) {
+ return;
+ }
// We want the earliest possible to start searching from. Start before
// the candidate key in case it turns out a delete came in later.
- HStoreKey search_key = candidateKeys.isEmpty()?
- new HStoreKey(row):
- new HStoreKey(candidateKeys.firstKey().getRow());
- List<HStoreKey> victims = new ArrayList<HStoreKey>();
- long now = System.currentTimeMillis();
+ KeyValue search = candidates.isEmpty()? kv: candidates.first();
// Get all the entries that come equal or after our search key
- SortedMap<HStoreKey, byte []> tailMap = map.tailMap(search_key);
+ SortedSet<KeyValue> tailset = set.tailSet(search);
// if there are items in the tail map, there's either a direct match to
// the search key, or a range of values between the first candidate key
// and the ultimate search key (or the end of the cache)
- if (!tailMap.isEmpty() &&
- this.rawcomparator.compareRows(tailMap.firstKey().getRow(),
- search_key.getRow()) <= 0) {
- Iterator<HStoreKey> key_iterator = tailMap.keySet().iterator();
-
+ if (!tailset.isEmpty() &&
+ this.comparator.compareRows(tailset.first(), search) <= 0) {
// Keep looking at cells as long as they are no greater than the
// ultimate search key and there's still records left in the map.
- HStoreKey deletedOrExpiredRow = null;
- for (HStoreKey found_key = null; key_iterator.hasNext() &&
- (found_key == null ||
- this.rawcomparator.compareRows(found_key.getRow(), row) <= 0);) {
- found_key = key_iterator.next();
- if (this.rawcomparator.compareRows(found_key.getRow(), row) <= 0) {
- if (HLogEdit.isDeleted(tailMap.get(found_key))) {
- Store.handleDeleted(found_key, candidateKeys, deletes);
- if (deletedOrExpiredRow == null) {
- deletedOrExpiredRow = found_key;
+ KeyValue deleted = null;
+ KeyValue found = null;
+ for (Iterator<KeyValue> iterator = tailset.iterator();
+ iterator.hasNext() && (found == null ||
+ this.comparator.compareRows(found, kv) <= 0);) {
+ found = iterator.next();
+ if (this.comparator.compareRows(found, kv) <= 0) {
+ if (found.isDeleteType()) {
+ Store.handleDeletes(found, candidates, deletes);
+ if (deleted == null) {
+ deleted = found;
}
} else {
- if (Store.notExpiredAndNotInDeletes(this.ttl,
- found_key, now, deletes)) {
- candidateKeys.put(stripTimestamp(found_key),
- Long.valueOf(found_key.getTimestamp()));
+ if (Store.notExpiredAndNotInDeletes(this.ttl, found, now, deletes)) {
+ candidates.add(found);
} else {
- if (deletedOrExpiredRow == null) {
- deletedOrExpiredRow = new HStoreKey(found_key);
+ if (deleted == null) {
+ deleted = found;
}
- addVictim(victims, found_key);
+ // TODO: Check this removes the right key.
+ // Its expired. Remove it.
+ iterator.remove();
}
}
}
}
- if (candidateKeys.isEmpty() && deletedOrExpiredRow != null) {
- getRowKeyBefore(map, deletedOrExpiredRow, candidateKeys, victims,
- deletes, now);
+ if (candidates.isEmpty() && deleted != null) {
+ getRowKeyBefore(set, deleted, candidates, deletes, now);
}
} else {
// The tail didn't contain any keys that matched our criteria, or was
// empty. Examine all the keys that proceed our splitting point.
- getRowKeyBefore(map, search_key, candidateKeys, victims, deletes, now);
- }
- // Remove expired victims from the map.
- for (HStoreKey victim: victims) {
- map.remove(victim);
+ getRowKeyBefore(set, search, candidates, deletes, now);
}
}
-
+
/*
* Get row key that comes before passed <code>search_key</code>
* Use when we know search_key is not in the map and we need to search
* earlier in the cache.
- * @param map
- * @param search_key
- * @param candidateKeys
- * @param victims
- */
- private void getRowKeyBefore(SortedMap<HStoreKey, byte []> map,
- HStoreKey search_key, SortedMap<HStoreKey, Long> candidateKeys,
- final List<HStoreKey> expires, final Set<HStoreKey> deletes,
- final long now) {
- SortedMap<HStoreKey, byte []> headMap = map.headMap(search_key);
+ * @param set
+ * @param search
+ * @param candidates
+ * @param deletes Pass a Set that has a Comparator that ignores key type.
+ * @param now
+ */
+ private void getRowKeyBefore(ConcurrentSkipListSet<KeyValue> set,
+ KeyValue search, NavigableSet<KeyValue> candidates,
+ final NavigableSet<KeyValue> deletes, final long now) {
+ NavigableSet<KeyValue> headSet = set.headSet(search);
// If we tried to create a headMap and got an empty map, then there are
// no keys at or before the search key, so we're done.
- if (headMap.isEmpty()) {
+ if (headSet.isEmpty()) {
return;
}
// If there aren't any candidate keys at this point, we need to search
// backwards until we find at least one candidate or run out of headMap.
- if (candidateKeys.isEmpty()) {
- Set<HStoreKey> keys = headMap.keySet();
- HStoreKey [] cells = keys.toArray(new HStoreKey[keys.size()]);
- byte [] lastRowFound = null;
- for (int i = cells.length - 1; i >= 0; i--) {
- HStoreKey found_key = cells[i];
+ if (candidates.isEmpty()) {
+ KeyValue lastFound = null;
+ for (Iterator<KeyValue> i = headSet.descendingIterator(); i.hasNext();) {
+ KeyValue found = i.next();
// if the last row we found a candidate key for is different than
// the row of the current candidate, we can stop looking -- if its
// not a delete record.
- boolean deleted = HLogEdit.isDeleted(headMap.get(found_key));
- if (lastRowFound != null &&
- !HStoreKey.equalsTwoRowKeys(lastRowFound, found_key.getRow()) &&
- !deleted) {
+ boolean deleted = found.isDeleteType();
+ if (lastFound != null &&
+ this.comparator.matchingRows(lastFound, found) && !deleted) {
break;
}
// If this isn't a delete, record it as a candidate key. Also
- // take note of the row of this candidate so that we'll know when
+ // take note of this candidate so that we'll know when
// we cross the row boundary into the previous row.
if (!deleted) {
- if (Store.notExpiredAndNotInDeletes(this.ttl, found_key, now, deletes)) {
- lastRowFound = found_key.getRow();
- candidateKeys.put(stripTimestamp(found_key),
- Long.valueOf(found_key.getTimestamp()));
+ if (Store.notExpiredAndNotInDeletes(this.ttl, found, now, deletes)) {
+ lastFound = found;
+ candidates.add(found);
} else {
- expires.add(found_key);
- if (LOG.isDebugEnabled()) {
- LOG.debug("getRowKeyBefore: " + found_key + ": expired, skipped");
- }
+ // Its expired.
+ Store.expiredOrDeleted(set, found);
}
} else {
- deletes.add(found_key);
+ // We are encountering items in reverse. We may have just added
+ // an item to candidates that this later item deletes. Check. If we
+ // found something in candidates, remove it from the set.
+ if (Store.handleDeletes(found, candidates, deletes)) {
+ remove(set, found);
+ }
}
}
} else {
@@ -540,223 +529,94 @@
// the very last row's worth of keys in the headMap, because any
// smaller acceptable candidate keys would have caused us to start
// our search earlier in the list, and we wouldn't be searching here.
- SortedMap<HStoreKey, byte[]> thisRowTailMap =
- headMap.tailMap(new HStoreKey(headMap.lastKey().getRow()));
- Iterator<HStoreKey> key_iterator = thisRowTailMap.keySet().iterator();
+ SortedSet<KeyValue> rowTailMap =
+ headSet.tailSet(headSet.last().cloneRow(HConstants.LATEST_TIMESTAMP));
+ Iterator<KeyValue> i = rowTailMap.iterator();
do {
- HStoreKey found_key = key_iterator.next();
- if (HLogEdit.isDeleted(thisRowTailMap.get(found_key))) {
- Store.handleDeleted(found_key, candidateKeys, deletes);
+ KeyValue found = i.next();
+ if (found.isDeleteType()) {
+ Store.handleDeletes(found, candidates, deletes);
} else {
if (ttl == HConstants.FOREVER ||
- now < found_key.getTimestamp() + ttl ||
- !deletes.contains(found_key)) {
- candidateKeys.put(stripTimestamp(found_key),
- Long.valueOf(found_key.getTimestamp()));
+ now < found.getTimestamp() + ttl ||
+ !deletes.contains(found)) {
+ candidates.add(found);
} else {
- expires.add(found_key);
- if (LOG.isDebugEnabled()) {
- LOG.debug("internalGetRowKeyAtOrBefore: " + found_key +
- ": expired, skipped");
- }
+ Store.expiredOrDeleted(set, found);
}
}
- } while (key_iterator.hasNext());
+ } while (i.hasNext());
}
}
-
- static HStoreKey stripTimestamp(HStoreKey key) {
- return new HStoreKey(key.getRow(), key.getColumn());
- }
-
+
/*
* Examine a single map for the desired key.
*
* TODO - This is kinda slow. We need a data structure that allows for
* proximity-searches, not just precise-matches.
*
- * @param map
+ * @param set
* @param key
- * @param numVersions
- * @param deletes
- * @return Ordered list of items found in passed <code>map</code>. If no
- * matching values, returns an empty list (does not return null).
+ * @param results
+ * @param versions
+ * @param keyvalues
+ * @param deletes Pass a Set that has a Comparator that ignores key type.
+ * @param now
+ * @return True if enough versions.
*/
- private ArrayList<Cell> get(final SortedMap<HStoreKey, byte []> map,
- final HStoreKey key, final int numVersions, final Set<HStoreKey> deletes,
+ private boolean get(final ConcurrentSkipListSet<KeyValue> set,
+ final KeyValue key, final int versions,
+ final List<KeyValue> keyvalues,
+ final NavigableSet<KeyValue> deletes,
final long now) {
- ArrayList<Cell> result = new ArrayList<Cell>();
- List<HStoreKey> victims = new ArrayList<HStoreKey>();
- SortedMap<HStoreKey, byte[]> tailMap = map.tailMap(key);
- for (Map.Entry<HStoreKey, byte[]> es : tailMap.entrySet()) {
- HStoreKey itKey = es.getKey();
- if (itKey.matchesRowCol(key)) {
- if (!isDeleted(es.getValue())) {
- // Filter out expired results
- if (Store.notExpiredAndNotInDeletes(ttl, itKey, now, deletes)) {
- result.add(new Cell(tailMap.get(itKey), itKey.getTimestamp()));
- if (numVersions > 0 && result.size() >= numVersions) {
- break;
- }
- } else {
- addVictim(victims, itKey);
- }
- } else {
- // Cell holds a delete value.
- deletes.add(itKey);
+ NavigableSet<KeyValue> tailset = set.tailSet(key);
+ if (tailset.isEmpty()) {
+ return false;
+ }
+ boolean enoughVersions = false;
+ for (KeyValue kv : tailset) {
+ if (this.comparator.matchingRowColumn(kv, key)) {
+ if (Store.doKeyValue(kv, versions, deletes, now, this.ttl, keyvalues,
+ tailset)) {
+ break;
}
} else {
// By L.N. HBASE-684, map is sorted, so we can't find match any more.
break;
}
}
- // Remove expired victims from the map.
- for (HStoreKey v: victims) {
- map.remove(v);
- }
- return result;
- }
-
- /*
- * Add <code>key</code> to the list of 'victims'.
- * @param victims
- * @param key
- */
- private void addVictim(final List<HStoreKey> victims, final HStoreKey key) {
- victims.add(key);
- if (LOG.isDebugEnabled()) {
- LOG.debug(key + ": expired or in deletes, skipped");
- }
- }
-
- /**
- * Get <code>versions</code> keys matching the origin key's
- * row/column/timestamp and those of an older vintage.
- * @param origin Where to start searching.
- * @param versions How many versions to return. Pass
- * {@link HConstants.ALL_VERSIONS} to retrieve all.
- * @param now
- * @param deletes Accumulating list of deletes
- * @param columnPattern regex pattern for column matching. if columnPattern
- * is not null, we use column pattern to match columns. And the columnPattern
- * only works when origin's column is null or its length is zero.
- * @return Ordered list of <code>versions</code> keys going from newest back.
- * @throws IOException
- */
- List<HStoreKey> getKeys(final HStoreKey origin, final int versions,
- final Set<HStoreKey> deletes, final long now,
- final Pattern columnPattern) {
- this.lock.readLock().lock();
- try {
- List<HStoreKey> results;
- synchronized (memcache) {
- results =
- getKeys(this.memcache, origin, versions, deletes, now, columnPattern);
- }
- synchronized (snapshot) {
- results.addAll(results.size(), getKeys(snapshot, origin,
- versions == HConstants.ALL_VERSIONS ? versions :
- (versions - results.size()), deletes, now, columnPattern));
- }
- return results;
- } finally {
- this.lock.readLock().unlock();
- }
+ return enoughVersions;
}
/*
- * @param origin Where to start searching.
- * @param versions How many versions to return. Pass
- * {@link HConstants.ALL_VERSIONS} to retrieve all.
- * @param now
- * @param deletes
- * @param columnPattern regex pattern for column matching. if columnPattern
- * is not null, we use column pattern to match columns. And the columnPattern
- * only works when origin's column is null or its length is zero.
- * @return List of all keys that are of the same row and column and of
- * equal or older timestamp. If no keys, returns an empty List. Does not
- * return null.
- */
- private List<HStoreKey> getKeys(final SortedMap<HStoreKey,
- byte []> map, final HStoreKey origin, final int versions,
- final Set<HStoreKey> deletes, final long now,
- final Pattern columnPattern) {
- List<HStoreKey> result = new ArrayList<HStoreKey>();
- List<HStoreKey> victims = new ArrayList<HStoreKey>();
- SortedMap<HStoreKey, byte []> tailMap = map.tailMap(origin);
- for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
- HStoreKey key = es.getKey();
- // if there's no column name, then compare rows and timestamps
- if (origin.getColumn() != null && origin.getColumn().length == 0) {
- // if the current and origin row don't match, then we can jump
- // out of the loop entirely.
- if (!HStoreKey.equalsTwoRowKeys( key.getRow(), origin.getRow())) {
- break;
- }
- // if the column pattern is not null, we use it for column matching.
- // we will skip the keys whose column doesn't match the pattern.
- if (columnPattern != null) {
- if (!(columnPattern.matcher(Bytes.toString(key.getColumn())).matches())) {
- continue;
- }
- }
- // if the rows match but the timestamp is newer, skip it so we can
- // get to the ones we actually want.
- if (key.getTimestamp() > origin.getTimestamp()) {
- continue;
- }
- } else { // compare rows and columns
- // if the key doesn't match the row and column, then we're done, since
- // all the cells are ordered.
- if (!key.matchesRowCol(origin)) {
- break;
- }
- }
- if (!isDeleted(es.getValue())) {
- if (Store.notExpiredAndNotInDeletes(this.ttl, key, now, deletes)) {
- result.add(key);
- if (versions > 0 && result.size() >= versions) {
- break;
- }
- } else {
- addVictim(victims, key);
- }
- } else {
- // Delete
- deletes.add(key);
+ * @param set
+ * @param kv This is a delete record. Remove anything behind this of same
+ * r/c/ts.
+ * @return True if we removed anything.
+ */
+ private boolean remove(final NavigableSet<KeyValue> set, final KeyValue kv) {
+ SortedSet<KeyValue> s = set.tailSet(kv);
+ if (s.isEmpty()) {
+ return false;
+ }
+ boolean removed = false;
+ for (KeyValue k: s) {
+ if (this.comparatorIgnoreType.compare(k, kv) == 0) {
+ // Same r/c/ts. Remove it.
+ s.remove(k);
+ removed = true;
+ continue;
}
+ break;
}
- // Clean expired victims from the map.
- for (HStoreKey v: victims) {
- map.remove(v);
- }
- return result;
- }
-
- /**
- * @param key
- * @return True if an entry and its content is {@link HGlobals.deleteBytes}.
- * Use checking values in store. On occasion the memcache has the fact that
- * the cell has been deleted.
- */
- boolean isDeleted(final HStoreKey key) {
- return isDeleted(this.memcache.get(key)) ||
- (this.snapshot != null && isDeleted(this.snapshot.get(key)));
- }
-
- /*
- * @param b Cell value.
- * @return True if this is a delete value.
- */
- private boolean isDeleted(final byte [] b) {
- return HLogEdit.isDeleted(b);
+ return removed;
}
/**
* @return a scanner over the keys in the Memcache
*/
InternalScanner getScanner(long timestamp,
- final byte [][] targetCols, final byte [] firstRow)
+ final NavigableSet<byte []> targetCols, final byte [] firstRow)
throws IOException {
this.lock.readLock().lock();
try {
@@ -772,89 +632,71 @@
//////////////////////////////////////////////////////////////////////////////
private class MemcacheScanner extends HAbstractScanner {
- private byte [] currentRow;
- private Set<byte []> columns = null;
+ private KeyValue current;
+ private final NavigableSet<byte []> columns;
+ private final NavigableSet<KeyValue> deletes;
+ private final Map<KeyValue, Counter> versionCounter;
+ private final long now = System.currentTimeMillis();
- MemcacheScanner(final long timestamp, final byte [] targetCols[],
+ MemcacheScanner(final long timestamp, final NavigableSet<byte []> columns,
final byte [] firstRow)
throws IOException {
// Call to super will create ColumnMatchers and whether this is a regex
// scanner or not. Will also save away timestamp. Also sorts rows.
- super(timestamp, targetCols);
- this.currentRow = firstRow;
+ super(timestamp, columns);
+ this.deletes = new TreeSet<KeyValue>(comparatorIgnoreType);
+ this.versionCounter =
+ new TreeMap<KeyValue, Counter>(comparatorIgnoreTimestamp);
+ this.current = KeyValue.createFirstOnRow(firstRow, timestamp);
// If we're being asked to scan explicit columns rather than all in
// a family or columns that match regexes, cache the sorted array of
// columns.
- this.columns = null;
- if (!isWildcardScanner()) {
- this.columns = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
- for (int i = 0; i < targetCols.length; i++) {
- this.columns.add(targetCols[i]);
- }
- }
+ this.columns = isWildcardScanner()? null: columns;
}
@Override
- public boolean next(HStoreKey key, SortedMap<byte [], Cell> results)
+ public boolean next(final List<KeyValue> keyvalues)
throws IOException {
if (this.scannerClosed) {
return false;
}
- // This is a treemap rather than a Hashmap because then I can have a
- // byte array as key -- because I can independently specify a comparator.
- Map<byte [], Long> deletes =
- new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
- // Catch all row results in here. These results are ten filtered to
- // ensure they match column name regexes, or if none, added to results.
- Map<byte [], Cell> rowResults =
- new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
- if (results.size() > 0) {
- results.clear();
- }
- long latestTimestamp = -1;
- while (results.size() <= 0 && this.currentRow != null) {
- if (deletes.size() > 0) {
+ while (keyvalues.isEmpty() && this.current != null) {
+ // Deletes are per row.
+ if (!deletes.isEmpty()) {
deletes.clear();
}
- if (rowResults.size() > 0) {
- rowResults.clear();
- }
- key.setRow(this.currentRow);
- key.setVersion(this.timestamp);
- getFull(key, isWildcardScanner() ? null : this.columns, 1, deletes,
- rowResults);
- for (Map.Entry<byte [], Long> e: deletes.entrySet()) {
- rowResults.put(e.getKey(),
- new Cell(HLogEdit.DELETED_BYTES, e.getValue().longValue()));
+ if (!versionCounter.isEmpty()) {
+ versionCounter.clear();
}
- for (Map.Entry<byte [], Cell> e: rowResults.entrySet()) {
- byte [] column = e.getKey();
- Cell c = e.getValue();
+ // The getFull will take care of expired and deletes inside memcache.
+ // The first getFull when row is the special empty bytes will return
+ // nothing so we go around again. Alternative is calling a getNextRow
+ // if row is null but that looks like it would take same amount of work
+ // so leave it for now.
+ getFull(this.current, isWildcardScanner()? null: this.columns, null, 1,
+ versionCounter, deletes, keyvalues, this.now);
+ for (KeyValue bb: keyvalues) {
if (isWildcardScanner()) {
// Check the results match. We only check columns, not timestamps.
// We presume that timestamps have been handled properly when we
// called getFull.
- if (!columnMatch(column)) {
- continue;
+ if (!columnMatch(bb)) {
+ keyvalues.remove(bb);
}
}
- // We should never return HConstants.LATEST_TIMESTAMP as the time for
- // the row. As a compromise, we return the largest timestamp for the
- // entries that we find that match.
- if (c.getTimestamp() != HConstants.LATEST_TIMESTAMP &&
- c.getTimestamp() > latestTimestamp) {
- latestTimestamp = c.getTimestamp();
- }
- results.put(column, c);
}
- this.currentRow = getNextRow(this.currentRow);
- }
- // Set the timestamp to the largest one for the row if we would otherwise
- // return HConstants.LATEST_TIMESTAMP
- if (key.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
- key.setVersion(latestTimestamp);
+ // Add any deletes found so they are available to the StoreScanner#next.
+ if (!this.deletes.isEmpty()) {
+ keyvalues.addAll(deletes);
+ }
+ this.current = getNextRow(this.current);
+ // Change current to be column-less and to have the scanners' now. We
+ // do this because first item on 'next row' may not have the scanners'
+ // now time which will cause trouble down in getFull; same reason no
+ // column.
+ if (this.current != null) this.current = this.current.cloneRow(this.now);
}
- return results.size() > 0;
+ return !keyvalues.isEmpty();
}
public void close() {
@@ -871,8 +713,10 @@
* allows you get 'deep size' on objects.
* @param args
* @throws InterruptedException
+ * @throws IOException
*/
- public static void main(String [] args) throws InterruptedException {
+ public static void main(String [] args)
+ throws InterruptedException, IOException {
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
LOG.info("vmName=" + runtime.getVmName() + ", vmVendor=" +
runtime.getVmVendor() + ", vmVersion=" + runtime.getVmVersion());
@@ -881,28 +725,27 @@
// TODO: x32 vs x64
long size = 0;
final int count = 10000;
+ byte [] column = Bytes.toBytes("col:umn");
for (int i = 0; i < count; i++) {
- size += memcache1.add(new HStoreKey(Bytes.toBytes(i)),
- HConstants.EMPTY_BYTE_ARRAY);
+ // Give each its own ts
+ size += memcache1.add(new KeyValue(Bytes.toBytes(i), column, i));
}
LOG.info("memcache1 estimated size=" + size);
for (int i = 0; i < count; i++) {
- size += memcache1.add(new HStoreKey(Bytes.toBytes(i)),
- HConstants.EMPTY_BYTE_ARRAY);
+ size += memcache1.add(new KeyValue(Bytes.toBytes(i), column, i));
}
LOG.info("memcache1 estimated size (2nd loading of same data)=" + size);
// Make a variably sized memcache.
Memcache memcache2 = new Memcache();
for (int i = 0; i < count; i++) {
- byte [] b = Bytes.toBytes(i);
- size += memcache2.add(new HStoreKey(b, b),
- new byte [i]);
+ size += memcache2.add(new KeyValue(Bytes.toBytes(i), column, i,
+ new byte[i]));
}
LOG.info("memcache2 estimated size=" + size);
final int seconds = 30;
LOG.info("Waiting " + seconds + " seconds while heap dump is taken");
for (int i = 0; i < seconds; i++) {
- Thread.sleep(1000);
+ // Thread.sleep(1000);
}
LOG.info("Exiting.");
}