You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/08/18 00:21:00 UTC
svn commit: r805183 - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/regionserver/
src/java/org/apache/hadoop/hbase/util/ src/test/org/apache/hadoop/hbase/io/
src/test/org/apache/hadoop/hbase/regionserver/
Author: stack
Date: Mon Aug 17 22:21:00 2009
New Revision: 805183
URL: http://svn.apache.org/viewvc?rev=805183&view=rev
Log:
HBASE-1738 Scanner doesnt reset when a snapshot is created, could miss new updates into the 'kvset' (active part)
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/ClassSize.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=805183&r1=805182&r2=805183&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon Aug 17 22:21:00 2009
@@ -337,6 +337,8 @@
storefile problems
HBASE-1761 getclosest doesn't understand delete family; manifests as
"HRegionInfo was null or empty in .META" A.K.A the BS problem
+ HBASE-1738 Scanner doesnt reset when a snapshot is created, could miss
+ new updates into the 'kvset' (active part)
IMPROVEMENTS
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=805183&r1=805182&r2=805183&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemStore.java Mon Aug 17 22:21:00 2009
@@ -28,7 +28,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
+import java.util.Set;
import java.util.SortedSet;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -78,6 +80,10 @@
// Used to track own heapSize
final AtomicLong size;
+ // All access must be synchronized.
+ final CopyOnWriteArraySet<ChangedMemStoreObserver> changedMemStoreObservers =
+ new CopyOnWriteArraySet<ChangedMemStoreObserver>();
+
/**
* Default constructor. Used for tests.
*/
@@ -123,12 +129,10 @@
LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?");
} else {
- // We used to synchronize on the memstore here but we're inside a
- // write lock so removed it. Comment is left in case removal was a
- // mistake. St.Ack
if (!this.kvset.isEmpty()) {
this.snapshot = this.kvset;
this.kvset = new KeyValueSkipListSet(this.comparator);
+ tellChangedMemStoreObservers();
// Reset heap to not include any keys
this.size.set(DEEP_OVERHEAD);
}
@@ -138,6 +142,15 @@
}
}
+ /*
+ * Tell outstanding scanners that memstore has changed.
+ */
+ private void tellChangedMemStoreObservers() {
+ for (ChangedMemStoreObserver o: this.changedMemStoreObservers) {
+ o.changedMemStore();
+ }
+ }
+
/**
* Return the current snapshot.
* Called by flusher to get current snapshot made by a previous
@@ -168,6 +181,7 @@
// create a new snapshot and let the old one go.
if (!ss.isEmpty()) {
this.snapshot = new KeyValueSkipListSet(this.comparator);
+ tellChangedMemStoreObservers();
}
} finally {
this.lock.writeLock().unlock();
@@ -445,9 +459,8 @@
KeyValueScanner [] getScanners() {
this.lock.readLock().lock();
try {
- KeyValueScanner [] scanners = new KeyValueScanner[2];
- scanners[0] = new MemStoreScanner(this.kvset);
- scanners[1] = new MemStoreScanner(this.snapshot);
+ KeyValueScanner [] scanners = new KeyValueScanner[1];
+ scanners[0] = new MemStoreScanner(this.changedMemStoreObservers);
return scanners;
} finally {
this.lock.readLock().unlock();
@@ -521,18 +534,22 @@
/*
* MemStoreScanner implements the KeyValueScanner.
- * It lets the caller scan the contents of a memstore.
- * This behaves as if it were a real scanner but does not maintain position
- * in the passed memstore tree.
- */
- protected class MemStoreScanner implements KeyValueScanner {
- private final NavigableSet<KeyValue> kvs;
- private KeyValue current = null;
+ * It lets the caller scan the contents of a memstore -- both current
+ * map and snapshot.
+ * This behaves as if it were a real scanner but does not maintain position.
+ */
+ protected class MemStoreScanner implements KeyValueScanner, ChangedMemStoreObserver {
private List<KeyValue> result = new ArrayList<KeyValue>();
private int idx = 0;
-
- MemStoreScanner(final NavigableSet<KeyValue> s) {
- this.kvs = s;
+ // Make access atomic.
+ private FirstOnRow firstOnNextRow = new FirstOnRow();
+ // Keep reference to Set so can remove myself when closed.
+ private final Set<ChangedMemStoreObserver> observers;
+
+ MemStoreScanner(final Set<ChangedMemStoreObserver> observers) {
+ super();
+ this.observers = observers;
+ this.observers.add(this);
}
public boolean seek(KeyValue key) {
@@ -541,7 +558,7 @@
close();
return false;
}
- this.current = key;
+ this.firstOnNextRow.set(key);
return cacheNextRow();
} catch(Exception e) {
close();
@@ -570,47 +587,117 @@
}
/**
- * @return True if we successfully cached a NavigableSet aligned on
- * next row.
+ * @return True if successfully cached a next row.
*/
boolean cacheNextRow() {
- SortedSet<KeyValue> keys;
+ // Prevent snapshot being cleared while caching a row.
+ lock.readLock().lock();
+ this.result.clear();
+ this.idx = 0;
try {
- keys = this.kvs.tailSet(this.current);
- } catch (Exception e) {
- close();
- return false;
- }
- if (keys == null || keys.isEmpty()) {
- close();
- return false;
+ // Look at each set, kvset and snapshot.
+ // Both look for matching entries for this.current row returning what
+ // they
+ // have as next row after this.current (or null if nothing in set or if
+ // nothing follows.
+ KeyValue kvsetNextRow = cacheNextRow(kvset);
+ KeyValue snapshotNextRow = cacheNextRow(snapshot);
+ if (kvsetNextRow == null && snapshotNextRow == null) {
+ // Nothing more in memstore but we might have gotten current row
+ // results
+ // Indicate at end of store by setting next row to null.
+ this.firstOnNextRow.set(null);
+ return !this.result.isEmpty();
+ } else if (kvsetNextRow != null && snapshotNextRow != null) {
+ // Set current at the lowest of the two values.
+ int compare = comparator.compare(kvsetNextRow, snapshotNextRow);
+ this.firstOnNextRow.set(compare <= 0? kvsetNextRow: snapshotNextRow);
+ } else {
+ this.firstOnNextRow.set(kvsetNextRow != null? kvsetNextRow: snapshotNextRow);
+ }
+ return true;
+ } finally {
+ lock.readLock().unlock();
}
- this.current = null;
- byte [] row = keys.first().getRow();
- for (KeyValue kv: keys) {
- if (comparator.compareRows(kv, row) != 0) {
- this.current = kv;
+ }
+
+ /*
+ * See if set has entries for the <code>this.current</code> row. If so,
+ * add them to <code>this.result</code>.
+ * @param set Set to examine
+ * @return Next row in passed <code>set</code> or null if nothing in this
+ * passed <code>set</code>
+ */
+ private KeyValue cacheNextRow(final NavigableSet<KeyValue> set) {
+ if (this.firstOnNextRow.get() == null || set.isEmpty()) return null;
+ SortedSet<KeyValue> tail = set.tailSet(this.firstOnNextRow.get());
+ if (tail == null || tail.isEmpty()) return null;
+ KeyValue first = tail.first();
+ KeyValue nextRow = null;
+ for (KeyValue kv: tail) {
+ if (comparator.compareRows(first, kv) != 0) {
+ nextRow = kv;
break;
}
- result.add(kv);
+ this.result.add(kv);
}
- return true;
+ return nextRow;
}
public void close() {
- current = null;
+ this.firstOnNextRow.set(null);
idx = 0;
if (!result.isEmpty()) {
result.clear();
}
+ this.observers.remove(this);
+ }
+
+ public void changedMemStore() {
+ this.firstOnNextRow.reset();
}
}
-
+
+ /*
+ * Private class that holds firstOnRow and utility.
+ * Usually firstOnRow is the first KeyValue we find on next row rather than
+ * the absolute minimal first key (empty column, Type.Maximum, maximum ts).
+ * Usually its ok being sloppy with firstOnRow letting it be the first thing
+ * found on next row -- this works -- but if the memstore changes on us, reset
+ * firstOnRow to be the ultimate firstOnRow. We play sloppy with firstOnRow
+ * usually so we don't have to allocate a new KeyValue each time firstOnRow
+ * is updated.
+ */
+ private static class FirstOnRow {
+ private KeyValue firstOnRow = null;
+
+ FirstOnRow() {
+ super();
+ }
+
+ synchronized void set(final KeyValue kv) {
+ this.firstOnRow = kv;
+ }
+
+ /* Reset firstOnRow to a 'clean', absolute firstOnRow.
+ */
+ synchronized void reset() {
+ if (this.firstOnRow == null) return;
+ this.firstOnRow =
+ new KeyValue(this.firstOnRow.getRow(), HConstants.LATEST_TIMESTAMP);
+ }
+
+ synchronized KeyValue get() {
+ return this.firstOnRow;
+ }
+ }
+
public final static long FIXED_OVERHEAD = ClassSize.align(
- ClassSize.OBJECT + (7 * ClassSize.REFERENCE));
+ ClassSize.OBJECT + (8 * ClassSize.REFERENCE));
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
+ ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +
(2 * ClassSize.CONCURRENT_SKIPLISTMAP));
/*
@@ -682,4 +769,16 @@
}
LOG.info("Exiting.");
}
-}
+
+ /**
+ * Observers want to know about MemStore changes.
+ * Called when snapshot is cleared and when we make one.
+ */
+ interface ChangedMemStoreObserver {
+ /**
+ * Notify observers.
+ * @throws IOException
+ */
+ void changedMemStore();
+ }
+}
\ No newline at end of file
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/ClassSize.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/ClassSize.java?rev=805183&r1=805182&r2=805183&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/ClassSize.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/ClassSize.java Mon Aug 17 22:21:00 2009
@@ -92,6 +92,12 @@
/** Overhead for AtomicBoolean */
public static int ATOMIC_BOOLEAN = 0;
+ /** Overhead for CopyOnWriteArraySet */
+ public static int COPYONWRITE_ARRAYSET = 0;
+
+ /** Overhead for CopyOnWriteArrayList */
+ public static int COPYONWRITE_ARRAYLIST = 0;
+
private static final String THIRTY_TWO = "32";
/**
@@ -151,6 +157,9 @@
ATOMIC_BOOLEAN = align(OBJECT + Bytes.SIZEOF_BOOLEAN);
+ COPYONWRITE_ARRAYSET = align(OBJECT + REFERENCE);
+
+ COPYONWRITE_ARRAYLIST = align(OBJECT + (2 * REFERENCE) + ARRAY);
}
/**
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java?rev=805183&r1=805182&r2=805183&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/io/TestHeapSize.java Mon Aug 17 22:21:00 2009
@@ -6,6 +6,8 @@
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -164,6 +166,25 @@
assertEquals(expected, actual);
}
+ // CopyOnWriteArraySet
+ cl = CopyOnWriteArraySet.class;
+ expected = ClassSize.estimateBase(cl, false);
+ actual = ClassSize.COPYONWRITE_ARRAYSET;
+ if(expected != actual) {
+ ClassSize.estimateBase(cl, true);
+ assertEquals(expected, actual);
+ }
+
+ // CopyOnWriteArrayList
+ cl = CopyOnWriteArrayList.class;
+ expected = ClassSize.estimateBase(cl, false);
+ actual = ClassSize.COPYONWRITE_ARRAYLIST;
+ if(expected != actual) {
+ ClassSize.estimateBase(cl, true);
+ assertEquals(expected, actual);
+ }
+
+
}
/**
@@ -240,11 +261,15 @@
expected += ClassSize.estimateBase(AtomicLong.class, false);
expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
expected += ClassSize.estimateBase(ConcurrentSkipListMap.class, false);
+ expected += ClassSize.estimateBase(CopyOnWriteArraySet.class, false);
+ expected += ClassSize.estimateBase(CopyOnWriteArrayList.class, false);
if(expected != actual) {
ClassSize.estimateBase(cl, true);
ClassSize.estimateBase(ReentrantReadWriteLock.class, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(ConcurrentSkipListMap.class, true);
+ ClassSize.estimateBase(CopyOnWriteArraySet.class, true);
+ ClassSize.estimateBase(CopyOnWriteArrayList.class, true);
assertEquals(expected, actual);
}
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=805183&r1=805182&r2=805183&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/regionserver/TestMemStore.java Mon Aug 17 22:21:00 2009
@@ -90,6 +90,10 @@
s.close();
}
assertEquals(rowCount, count);
+ for (int i = 0; i < memstorescanners.length; i++) {
+ memstorescanners[0].close();
+ }
+ memstorescanners = this.memstore.getScanners();
// Now assert can count same number even if a snapshot mid-scan.
s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
this.memstore.comparator, null, memstorescanners);
@@ -112,6 +116,10 @@
s.close();
}
assertEquals(rowCount, count);
+ for (int i = 0; i < memstorescanners.length; i++) {
+ memstorescanners[0].close();
+ }
+ memstorescanners = this.memstore.getScanners();
// Assert that new values are seen in kvset as we scan.
long ts = System.currentTimeMillis();
s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
@@ -124,8 +132,7 @@
// Assert the stuff is coming out in right order.
assertTrue(Bytes.compareTo(Bytes.toBytes(count), result.get(0).getRow()) == 0);
// Row count is same as column count.
- // TODO PUTBACK assertEquals("count=" + count + ", result=" + result,
- // rowCount, result.size());
+ assertEquals("count=" + count + ", result=" + result, rowCount, result.size());
count++;
if (count == snapshotIndex) {
this.memstore.snapshot();
@@ -407,8 +414,7 @@
assertEquals(expected.get(i), result.get(i));
}
}
-
-
+
//////////////////////////////////////////////////////////////////////////////
// Delete tests
//////////////////////////////////////////////////////////////////////////////
@@ -637,4 +643,4 @@
return new KeyValue(row, Bytes.toBytes("test_col:"),
HConstants.LATEST_TIMESTAMP, value);
}
-}
\ No newline at end of file
+}