You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2013/11/14 23:06:34 UTC
svn commit: r1542104 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
main/java/org/apache/hadoop/hbase/regionserver/Store.java
test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
Author: larsh
Date: Thu Nov 14 22:06:34 2013
New Revision: 1542104
URL: http://svn.apache.org/r1542104
Log:
HBASE-9963 Remove the ReentrantReadWriteLock in the MemStore (Nicolas Liochon)
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1542104&r1=1542103&r2=1542104&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Thu Nov 14 22:06:34 2013
@@ -30,7 +30,6 @@ import java.util.List;
import java.util.NavigableSet;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -51,6 +50,10 @@ import org.apache.hadoop.hbase.util.Envi
* to snapshot and is cleared. We continue to serve edits out of new memstore
* and backing snapshot until flusher reports in that the flush succeeded. At
* this point we let the snapshot go.
+ * <p>
+ * The MemStore functions should not be called in parallel. Callers should hold
+ * write and read locks. This is done in {@link Store}.
+ * </p>
* TODO: Adjust size of the memstore when we remove items because they have
* been deleted.
* TODO: With new KVSLS, need to make sure we update HeapSize with difference
@@ -75,8 +78,6 @@ public class MemStore implements HeapSiz
// Snapshot of memstore. Made for flusher.
volatile KeyValueSkipListSet snapshot;
- final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
final KeyValue.KVComparator comparator;
// Used comparing versions -- same r/c and ts but different type.
@@ -143,30 +144,25 @@ public class MemStore implements HeapSiz
* To get the snapshot made by this method, use {@link #getSnapshot()}
*/
void snapshot() {
- this.lock.writeLock().lock();
- try {
- // If snapshot currently has entries, then flusher failed or didn't call
- // cleanup. Log a warning.
- if (!this.snapshot.isEmpty()) {
- LOG.warn("Snapshot called again without clearing previous. " +
- "Doing nothing. Another ongoing flush or did we fail last attempt?");
- } else {
- if (!this.kvset.isEmpty()) {
- this.snapshot = this.kvset;
- this.kvset = new KeyValueSkipListSet(this.comparator);
- this.snapshotTimeRangeTracker = this.timeRangeTracker;
- this.timeRangeTracker = new TimeRangeTracker();
- // Reset heap to not include any keys
- this.size.set(DEEP_OVERHEAD);
- // Reset allocator so we get a fresh buffer for the new memstore
- if (allocator != null) {
- this.allocator = new MemStoreLAB(conf);
- }
- timeOfOldestEdit = Long.MAX_VALUE;
+ // If snapshot currently has entries, then flusher failed or didn't call
+ // cleanup. Log a warning.
+ if (!this.snapshot.isEmpty()) {
+ LOG.warn("Snapshot called again without clearing previous. " +
+ "Doing nothing. Another ongoing flush or did we fail last attempt?");
+ } else {
+ if (!this.kvset.isEmpty()) {
+ this.snapshot = this.kvset;
+ this.kvset = new KeyValueSkipListSet(this.comparator);
+ this.snapshotTimeRangeTracker = this.timeRangeTracker;
+ this.timeRangeTracker = new TimeRangeTracker();
+ // Reset heap to not include any keys
+ this.size.set(DEEP_OVERHEAD);
+ // Reset allocator so we get a fresh buffer for the new memstore
+ if (allocator != null) {
+ this.allocator = new MemStoreLAB(conf);
}
+ timeOfOldestEdit = Long.MAX_VALUE;
}
- } finally {
- this.lock.writeLock().unlock();
}
}
@@ -190,20 +186,15 @@ public class MemStore implements HeapSiz
*/
void clearSnapshot(final SortedSet<KeyValue> ss)
throws UnexpectedException {
- this.lock.writeLock().lock();
- try {
- if (this.snapshot != ss) {
- throw new UnexpectedException("Current snapshot is " +
- this.snapshot + ", was passed " + ss);
- }
- // OK. Passed in snapshot is same as current snapshot. If not-empty,
- // create a new snapshot and let the old one go.
- if (!ss.isEmpty()) {
- this.snapshot = new KeyValueSkipListSet(this.comparator);
- this.snapshotTimeRangeTracker = new TimeRangeTracker();
- }
- } finally {
- this.lock.writeLock().unlock();
+ if (this.snapshot != ss) {
+ throw new UnexpectedException("Current snapshot is " +
+ this.snapshot + ", was passed " + ss);
+ }
+ // OK. Passed in snapshot is same as current snapshot. If not-empty,
+ // create a new snapshot and let the old one go.
+ if (!ss.isEmpty()) {
+ this.snapshot = new KeyValueSkipListSet(this.comparator);
+ this.snapshotTimeRangeTracker = new TimeRangeTracker();
}
}
@@ -213,13 +204,8 @@ public class MemStore implements HeapSiz
* @return approximate size of the passed key and value.
*/
long add(final KeyValue kv) {
- this.lock.readLock().lock();
- try {
- KeyValue toAdd = maybeCloneWithAllocator(kv);
- return internalAdd(toAdd);
- } finally {
- this.lock.readLock().unlock();
- }
+ KeyValue toAdd = maybeCloneWithAllocator(kv);
+ return internalAdd(toAdd);
}
long timeOfOldestEdit() {
@@ -285,26 +271,21 @@ public class MemStore implements HeapSiz
* @param kv
*/
void rollback(final KeyValue kv) {
- this.lock.readLock().lock();
- try {
- // If the key is in the snapshot, delete it. We should not update
- // this.size, because that tracks the size of only the memstore and
- // not the snapshot. The flush of this snapshot to disk has not
- // yet started because Store.flush() waits for all rwcc transactions to
- // commit before starting the flush to disk.
- KeyValue found = this.snapshot.get(kv);
- if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
- this.snapshot.remove(kv);
- }
- // If the key is in the memstore, delete it. Update this.size.
- found = this.kvset.get(kv);
- if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
- removeFromKVSet(kv);
- long s = heapSizeChange(kv, true);
- this.size.addAndGet(-s);
- }
- } finally {
- this.lock.readLock().unlock();
+ // If the key is in the snapshot, delete it. We should not update
+ // this.size, because that tracks the size of only the memstore and
+ // not the snapshot. The flush of this snapshot to disk has not
+ // yet started because Store.flush() waits for all rwcc transactions to
+ // commit before starting the flush to disk.
+ KeyValue found = this.snapshot.get(kv);
+ if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
+ this.snapshot.remove(kv);
+ }
+ // If the key is in the memstore, delete it. Update this.size.
+ found = this.kvset.get(kv);
+ if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
+ removeFromKVSet(kv);
+ long s = heapSizeChange(kv, true);
+ this.size.addAndGet(-s);
}
}
@@ -314,15 +295,9 @@ public class MemStore implements HeapSiz
* @return approximate size of the passed key and value.
*/
long delete(final KeyValue delete) {
- long s = 0;
- this.lock.readLock().lock();
- try {
- KeyValue toAdd = maybeCloneWithAllocator(delete);
- s += heapSizeChange(toAdd, addToKVSet(toAdd));
- timeRangeTracker.includeTimestamp(toAdd);
- } finally {
- this.lock.readLock().unlock();
- }
+ KeyValue toAdd = maybeCloneWithAllocator(delete);
+ long s = heapSizeChange(toAdd, addToKVSet(toAdd));
+ timeRangeTracker.includeTimestamp(toAdd);
this.size.addAndGet(s);
return s;
}
@@ -333,12 +308,7 @@ public class MemStore implements HeapSiz
* @return Next row or null if none found.
*/
KeyValue getNextRow(final KeyValue kv) {
- this.lock.readLock().lock();
- try {
- return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
- } finally {
- this.lock.readLock().unlock();
- }
+ return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
}
/*
@@ -382,13 +352,8 @@ public class MemStore implements HeapSiz
* @param state column/delete tracking state
*/
void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
- this.lock.readLock().lock();
- try {
- getRowKeyAtOrBefore(kvset, state);
- getRowKeyAtOrBefore(snapshot, state);
- } finally {
- this.lock.readLock().unlock();
- }
+ getRowKeyAtOrBefore(kvset, state);
+ getRowKeyAtOrBefore(snapshot, state);
}
/*
@@ -480,54 +445,49 @@ public class MemStore implements HeapSiz
byte[] qualifier,
long newValue,
long now) {
- this.lock.readLock().lock();
- try {
- KeyValue firstKv = KeyValue.createFirstOnRow(
- row, family, qualifier);
- // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
- SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
- if (!snSs.isEmpty()) {
- KeyValue snKv = snSs.first();
- // is there a matching KV in the snapshot?
- if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
- if (snKv.getTimestamp() == now) {
- // poop,
- now += 1;
- }
+ KeyValue firstKv = KeyValue.createFirstOnRow(
+ row, family, qualifier);
+ // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
+ SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
+ if (!snSs.isEmpty()) {
+ KeyValue snKv = snSs.first();
+ // is there a matching KV in the snapshot?
+ if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
+ if (snKv.getTimestamp() == now) {
+ // poop,
+ now += 1;
}
}
+ }
- // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
- // But the timestamp should also be max(now, mostRecentTsInMemstore)
+ // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
+ // But the timestamp should also be max(now, mostRecentTsInMemstore)
- // so we cant add the new KV w/o knowing what's there already, but we also
- // want to take this chance to delete some kvs. So two loops (sad)
+ // so we cant add the new KV w/o knowing what's there already, but we also
+ // want to take this chance to delete some kvs. So two loops (sad)
- SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
- Iterator<KeyValue> it = ss.iterator();
- while ( it.hasNext() ) {
- KeyValue kv = it.next();
-
- // if this isnt the row we are interested in, then bail:
- if (!kv.matchingColumn(family,qualifier) || !kv.matchingRow(firstKv) ) {
- break; // rows dont match, bail.
- }
+ SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
+ Iterator<KeyValue> it = ss.iterator();
+ while ( it.hasNext() ) {
+ KeyValue kv = it.next();
- // if the qualifier matches and it's a put, just RM it out of the kvset.
- if (kv.getType() == KeyValue.Type.Put.getCode() &&
- kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
- now = kv.getTimestamp();
- }
+ // if this isnt the row we are interested in, then bail:
+ if (!kv.matchingColumn(family,qualifier) || !kv.matchingRow(firstKv) ) {
+ break; // rows dont match, bail.
}
- // create or update (upsert) a new KeyValue with
- // 'now' and a 0 memstoreTS == immediately visible
- return upsert(Arrays.asList(
- new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)))
- );
- } finally {
- this.lock.readLock().unlock();
+ // if the qualifier matches and it's a put, just RM it out of the kvset.
+ if (kv.getType() == KeyValue.Type.Put.getCode() &&
+ kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
+ now = kv.getTimestamp();
+ }
}
+
+ // create or update (upsert) a new KeyValue with
+ // 'now' and a 0 memstoreTS == immediately visible
+ return upsert(Arrays.asList(
+ new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)))
+ );
}
/**
@@ -548,17 +508,12 @@ public class MemStore implements HeapSiz
* @return change in memstore size
*/
public long upsert(List<KeyValue> kvs) {
- this.lock.readLock().lock();
- try {
- long size = 0;
- for (KeyValue kv : kvs) {
- kv.setMemstoreTS(0);
- size += upsert(kv);
- }
- return size;
- } finally {
- this.lock.readLock().unlock();
+ long size = 0;
+ for (KeyValue kv : kvs) {
+ kv.setMemstoreTS(0);
+ size += upsert(kv);
}
+ return size;
}
/**
@@ -665,13 +620,8 @@ public class MemStore implements HeapSiz
* @return scanner on memstore and snapshot in this order.
*/
List<KeyValueScanner> getScanners() {
- this.lock.readLock().lock();
- try {
- return Collections.<KeyValueScanner>singletonList(
- new MemStoreScanner(MultiVersionConsistencyControl.getThreadReadPoint()));
- } finally {
- this.lock.readLock().unlock();
- }
+ return Collections.<KeyValueScanner>singletonList(
+ new MemStoreScanner(MultiVersionConsistencyControl.getThreadReadPoint()));
}
/**
@@ -716,7 +666,7 @@ public class MemStore implements HeapSiz
// the pre-calculated KeyValue to be returned by peek() or next()
private KeyValue theNext;
- private long readPoint;
+ private final long readPoint;
/*
Some notes...
@@ -930,7 +880,7 @@ public class MemStore implements HeapSiz
ClassSize.OBJECT + (11 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
- ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
+ ClassSize.ATOMIC_LONG +
(2 * ClassSize.TIMERANGE_TRACKER) +
(2 * ClassSize.KEYVALUE_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1542104&r1=1542103&r2=1542104&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Nov 14 22:06:34 2013
@@ -32,7 +32,6 @@ import java.util.SortedSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
@@ -758,7 +757,12 @@ public class Store extends SchemaConfigu
* {@link #flushCache(long, SortedSet<KeyValue>)} so it has some work to do.
*/
void snapshot() {
- this.memstore.snapshot();
+ this.lock.writeLock().lock();
+ try {
+ this.memstore.snapshot();
+ } finally {
+ this.lock.writeLock().unlock();
+ }
}
/**
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java?rev=1542104&r1=1542103&r2=1542104&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java Thu Nov 14 22:06:34 2013
@@ -298,14 +298,12 @@ public class TestHeapSize extends TestCa
// MemStore Deep Overhead
actual = MemStore.DEEP_OVERHEAD;
expected = ClassSize.estimateBase(cl, false);
- expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
expected += ClassSize.estimateBase(AtomicLong.class, false);
expected += (2 * ClassSize.estimateBase(KeyValueSkipListSet.class, false));
expected += (2 * ClassSize.estimateBase(ConcurrentSkipListMap.class, false));
expected += (2 * ClassSize.estimateBase(TimeRangeTracker.class, false));
if(expected != actual) {
ClassSize.estimateBase(cl, true);
- ClassSize.estimateBase(ReentrantReadWriteLock.class, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(KeyValueSkipListSet.class, true);
ClassSize.estimateBase(KeyValueSkipListSet.class, true);