You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2013/11/14 12:16:21 UTC
svn commit: r1541880 - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/
hbase-common/src/main/java/org/apache/hadoop/hbase/
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/
hbase-server/src/test/java/org/apach...
Author: nkeywal
Date: Thu Nov 14 11:16:20 2013
New Revision: 1541880
URL: http://svn.apache.org/r1541880
Log:
HBASE-9963 Remove the ReentrantReadWriteLock in the MemStore
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1541880&r1=1541879&r2=1541880&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Thu Nov 14 11:16:20 2013
@@ -37,7 +37,7 @@ import java.util.Map;
/**
* Used to communicate with a single HBase table.
- * Obtain an instance from an {@ink HConnection}.
+ * Obtain an instance from an {@link HConnection}.
*
* @since 0.21.0
*/
Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1541880&r1=1541879&r2=1541880&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java Thu Nov 14 11:16:20 2013
@@ -332,6 +332,19 @@ public class KeyValue implements Cell, H
this.length = length;
}
+ /**
+ * Creates a KeyValue from the specified byte array, starting at offset, and
+ * for length <code>length</code>.
+ *
+ * @param bytes byte array
+ * @param offset offset to start of the KeyValue
+ * @param length length of the KeyValue
+ * @param ts
+ */
+ public KeyValue(final byte[] bytes, final int offset, final int length, long ts) {
+ this(bytes, offset, length, null, 0, 0, null, 0, 0, ts, Type.Maximum, null, 0, 0, null);
+ }
+
/** Constructors that build a new backing byte array from fields */
/**
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1541880&r1=1541879&r2=1541880&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Thu Nov 14 11:16:20 2013
@@ -683,7 +683,12 @@ public class HStore implements Store {
* so it has some work to do.
*/
void snapshot() {
- this.memstore.snapshot();
+ this.lock.writeLock().lock();
+ try {
+ this.memstore.snapshot();
+ } finally {
+ this.lock.writeLock().unlock();
+ }
}
/**
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1541880&r1=1541879&r2=1541880&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Thu Nov 14 11:16:20 2013
@@ -29,7 +29,6 @@ import java.util.List;
import java.util.NavigableSet;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -53,6 +52,11 @@ import org.apache.hadoop.hbase.util.Envi
* to snapshot and is cleared. We continue to serve edits out of new memstore
* and backing snapshot until flusher reports in that the flush succeeded. At
* this point we let the snapshot go.
+ * <p>
+ * The MemStore functions should not be called in parallel. Callers should hold
+ * write and read locks. This is done in {@link HStore}.
+ * </p>
+ *
* TODO: Adjust size of the memstore when we remove items because they have
* been deleted.
* TODO: With new KVSLS, need to make sure we update HeapSize with difference
@@ -78,8 +82,6 @@ public class MemStore implements HeapSiz
// Snapshot of memstore. Made for flusher.
volatile KeyValueSkipListSet snapshot;
- final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
final KeyValue.KVComparator comparator;
// Used to track own heapSize
@@ -139,31 +141,26 @@ public class MemStore implements HeapSiz
* To get the snapshot made by this method, use {@link #getSnapshot()}
*/
void snapshot() {
- this.lock.writeLock().lock();
- try {
- // If snapshot currently has entries, then flusher failed or didn't call
- // cleanup. Log a warning.
- if (!this.snapshot.isEmpty()) {
- LOG.warn("Snapshot called again without clearing previous. " +
+ // If snapshot currently has entries, then flusher failed or didn't call
+ // cleanup. Log a warning.
+ if (!this.snapshot.isEmpty()) {
+ LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?");
- } else {
- if (!this.kvset.isEmpty()) {
- this.snapshot = this.kvset;
- this.kvset = new KeyValueSkipListSet(this.comparator);
- this.snapshotTimeRangeTracker = this.timeRangeTracker;
- this.timeRangeTracker = new TimeRangeTracker();
- // Reset heap to not include any keys
- this.size.set(DEEP_OVERHEAD);
- this.snapshotAllocator = this.allocator;
- // Reset allocator so we get a fresh buffer for the new memstore
- if (allocator != null) {
- this.allocator = new MemStoreLAB(conf, chunkPool);
- }
- timeOfOldestEdit = Long.MAX_VALUE;
+ } else {
+ if (!this.kvset.isEmpty()) {
+ this.snapshot = this.kvset;
+ this.kvset = new KeyValueSkipListSet(this.comparator);
+ this.snapshotTimeRangeTracker = this.timeRangeTracker;
+ this.timeRangeTracker = new TimeRangeTracker();
+ // Reset heap to not include any keys
+ this.size.set(DEEP_OVERHEAD);
+ this.snapshotAllocator = this.allocator;
+ // Reset allocator so we get a fresh buffer for the new memstore
+ if (allocator != null) {
+ this.allocator = new MemStoreLAB(conf, chunkPool);
}
+ timeOfOldestEdit = Long.MAX_VALUE;
}
- } finally {
- this.lock.writeLock().unlock();
}
}
@@ -188,24 +185,19 @@ public class MemStore implements HeapSiz
void clearSnapshot(final SortedSet<KeyValue> ss)
throws UnexpectedException {
MemStoreLAB tmpAllocator = null;
- this.lock.writeLock().lock();
- try {
- if (this.snapshot != ss) {
- throw new UnexpectedException("Current snapshot is " +
+ if (this.snapshot != ss) {
+ throw new UnexpectedException("Current snapshot is " +
this.snapshot + ", was passed " + ss);
- }
- // OK. Passed in snapshot is same as current snapshot. If not-empty,
- // create a new snapshot and let the old one go.
- if (!ss.isEmpty()) {
- this.snapshot = new KeyValueSkipListSet(this.comparator);
- this.snapshotTimeRangeTracker = new TimeRangeTracker();
- }
- if (this.snapshotAllocator != null) {
- tmpAllocator = this.snapshotAllocator;
- this.snapshotAllocator = null;
- }
- } finally {
- this.lock.writeLock().unlock();
+ }
+ // OK. Passed in snapshot is same as current snapshot. If not-empty,
+ // create a new snapshot and let the old one go.
+ if (!ss.isEmpty()) {
+ this.snapshot = new KeyValueSkipListSet(this.comparator);
+ this.snapshotTimeRangeTracker = new TimeRangeTracker();
+ }
+ if (this.snapshotAllocator != null) {
+ tmpAllocator = this.snapshotAllocator;
+ this.snapshotAllocator = null;
}
if (tmpAllocator != null) {
tmpAllocator.close();
@@ -218,13 +210,8 @@ public class MemStore implements HeapSiz
* @return approximate size of the passed key and value.
*/
long add(final KeyValue kv) {
- this.lock.readLock().lock();
- try {
- KeyValue toAdd = maybeCloneWithAllocator(kv);
- return internalAdd(toAdd);
- } finally {
- this.lock.readLock().unlock();
- }
+ KeyValue toAdd = maybeCloneWithAllocator(kv);
+ return internalAdd(toAdd);
}
long timeOfOldestEdit() {
@@ -274,7 +261,7 @@ public class MemStore implements HeapSiz
// not to do anything with it.
return kv;
}
- assert alloc != null && alloc.getData() != null;
+ assert alloc.getData() != null;
System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
newKv.setMvccVersion(kv.getMvccVersion());
@@ -290,26 +277,21 @@ public class MemStore implements HeapSiz
* @param kv
*/
void rollback(final KeyValue kv) {
- this.lock.readLock().lock();
- try {
- // If the key is in the snapshot, delete it. We should not update
- // this.size, because that tracks the size of only the memstore and
- // not the snapshot. The flush of this snapshot to disk has not
- // yet started because Store.flush() waits for all rwcc transactions to
- // commit before starting the flush to disk.
- KeyValue found = this.snapshot.get(kv);
- if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
- this.snapshot.remove(kv);
- }
- // If the key is in the memstore, delete it. Update this.size.
- found = this.kvset.get(kv);
- if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
- removeFromKVSet(kv);
- long s = heapSizeChange(kv, true);
- this.size.addAndGet(-s);
- }
- } finally {
- this.lock.readLock().unlock();
+ // If the key is in the snapshot, delete it. We should not update
+ // this.size, because that tracks the size of only the memstore and
+ // not the snapshot. The flush of this snapshot to disk has not
+ // yet started because Store.flush() waits for all rwcc transactions to
+ // commit before starting the flush to disk.
+ KeyValue found = this.snapshot.get(kv);
+ if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
+ this.snapshot.remove(kv);
+ }
+ // If the key is in the memstore, delete it. Update this.size.
+ found = this.kvset.get(kv);
+ if (found != null && found.getMvccVersion() == kv.getMvccVersion()) {
+ removeFromKVSet(kv);
+ long s = heapSizeChange(kv, true);
+ this.size.addAndGet(-s);
}
}
@@ -320,14 +302,9 @@ public class MemStore implements HeapSiz
*/
long delete(final KeyValue delete) {
long s = 0;
- this.lock.readLock().lock();
- try {
- KeyValue toAdd = maybeCloneWithAllocator(delete);
- s += heapSizeChange(toAdd, addToKVSet(toAdd));
- timeRangeTracker.includeTimestamp(toAdd);
- } finally {
- this.lock.readLock().unlock();
- }
+ KeyValue toAdd = maybeCloneWithAllocator(delete);
+ s += heapSizeChange(toAdd, addToKVSet(toAdd));
+ timeRangeTracker.includeTimestamp(toAdd);
this.size.addAndGet(s);
return s;
}
@@ -338,12 +315,7 @@ public class MemStore implements HeapSiz
* @return Next row or null if none found.
*/
KeyValue getNextRow(final KeyValue kv) {
- this.lock.readLock().lock();
- try {
- return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
- } finally {
- this.lock.readLock().unlock();
- }
+ return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
}
/*
@@ -387,13 +359,8 @@ public class MemStore implements HeapSiz
* @param state column/delete tracking state
*/
void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
- this.lock.readLock().lock();
- try {
- getRowKeyAtOrBefore(kvset, state);
- getRowKeyAtOrBefore(snapshot, state);
- } finally {
- this.lock.readLock().unlock();
- }
+ getRowKeyAtOrBefore(kvset, state);
+ getRowKeyAtOrBefore(snapshot, state);
}
/*
@@ -459,7 +426,8 @@ public class MemStore implements HeapSiz
// Stop looking if we've exited the better candidate range.
if (!state.isBetterCandidate(p.kv)) break;
// Make into firstOnRow
- firstOnRow = new KeyValue(p.kv.getRow(), HConstants.LATEST_TIMESTAMP);
+ firstOnRow = new KeyValue(p.kv.getRowArray(), p.kv.getRowOffset(), p.kv.getRowLength(),
+ HConstants.LATEST_TIMESTAMP);
// If we find something, break;
if (walkForwardInSingleRow(p.set, firstOnRow, state)) break;
}
@@ -487,54 +455,46 @@ public class MemStore implements HeapSiz
byte[] qualifier,
long newValue,
long now) {
- this.lock.readLock().lock();
- try {
- KeyValue firstKv = KeyValue.createFirstOnRow(
- row, family, qualifier);
- // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
- SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
- if (!snSs.isEmpty()) {
- KeyValue snKv = snSs.first();
- // is there a matching KV in the snapshot?
- if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
- if (snKv.getTimestamp() == now) {
- // poop,
- now += 1;
- }
+ KeyValue firstKv = KeyValue.createFirstOnRow(
+ row, family, qualifier);
+ // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
+ SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
+ if (!snSs.isEmpty()) {
+ KeyValue snKv = snSs.first();
+ // is there a matching KV in the snapshot?
+ if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
+ if (snKv.getTimestamp() == now) {
+ // poop,
+ now += 1;
}
}
+ }
- // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
- // But the timestamp should also be max(now, mostRecentTsInMemstore)
-
- // so we cant add the new KV w/o knowing what's there already, but we also
- // want to take this chance to delete some kvs. So two loops (sad)
+ // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
+ // But the timestamp should also be max(now, mostRecentTsInMemstore)
- SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
- Iterator<KeyValue> it = ss.iterator();
- while ( it.hasNext() ) {
- KeyValue kv = it.next();
-
- // if this isnt the row we are interested in, then bail:
- if (!kv.matchingColumn(family,qualifier) || !kv.matchingRow(firstKv) ) {
- break; // rows dont match, bail.
- }
+ // so we cant add the new KV w/o knowing what's there already, but we also
+ // want to take this chance to delete some kvs. So two loops (sad)
- // if the qualifier matches and it's a put, just RM it out of the kvset.
- if (kv.getType() == KeyValue.Type.Put.getCode() &&
- kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
- now = kv.getTimestamp();
- }
+ SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
+ for (KeyValue kv : ss) {
+ // if this isnt the row we are interested in, then bail:
+ if (!kv.matchingColumn(family, qualifier) || !kv.matchingRow(firstKv)) {
+ break; // rows dont match, bail.
}
- // create or update (upsert) a new KeyValue with
- // 'now' and a 0 memstoreTS == immediately visible
- List<Cell> cells = new ArrayList<Cell>(1);
- cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
- return upsert(cells, 1L);
- } finally {
- this.lock.readLock().unlock();
+ // if the qualifier matches and it's a put, just RM it out of the kvset.
+ if (kv.getTypeByte() == KeyValue.Type.Put.getCode() &&
+ kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
+ now = kv.getTimestamp();
+ }
}
+
+ // create or update (upsert) a new KeyValue with
+ // 'now' and a 0 memstoreTS == immediately visible
+ List<Cell> cells = new ArrayList<Cell>(1);
+ cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)));
+ return upsert(cells, 1L);
}
/**
@@ -556,16 +516,11 @@ public class MemStore implements HeapSiz
* @return change in memstore size
*/
public long upsert(Iterable<Cell> cells, long readpoint) {
- this.lock.readLock().lock();
- try {
- long size = 0;
- for (Cell cell : cells) {
- size += upsert(cell, readpoint);
- }
- return size;
- } finally {
- this.lock.readLock().unlock();
+ long size = 0;
+ for (Cell cell : cells) {
+ size += upsert(cell, readpoint);
}
+ return size;
}
/**
@@ -612,7 +567,8 @@ public class MemStore implements HeapSiz
// check that this is the row and column we are interested in, otherwise bail
if (kv.matchingRow(cur) && kv.matchingQualifier(cur)) {
// only remove Puts that concurrent scanners cannot possibly see
- if (cur.getType() == KeyValue.Type.Put.getCode() && cur.getMvccVersion() <= readpoint) {
+ if (cur.getTypeByte() == KeyValue.Type.Put.getCode() &&
+ cur.getMvccVersion() <= readpoint) {
if (versionsVisible > 1) {
// if we get here we have seen at least one version visible to the oldest scanner,
// which means we can prove that no scanner will see this version
@@ -675,13 +631,8 @@ public class MemStore implements HeapSiz
* @return scanner on memstore and snapshot in this order.
*/
List<KeyValueScanner> getScanners(long readPt) {
- this.lock.readLock().lock();
- try {
- return Collections.<KeyValueScanner>singletonList(
- new MemStoreScanner(readPt));
- } finally {
- this.lock.readLock().unlock();
- }
+ return Collections.<KeyValueScanner>singletonList(
+ new MemStoreScanner(readPt));
}
/**
@@ -959,16 +910,12 @@ public class MemStore implements HeapSiz
}
public final static long FIXED_OVERHEAD = ClassSize.align(
- ClassSize.OBJECT + (11 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
+ ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
- ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
- (2 * ClassSize.TIMERANGE_TRACKER) +
+ ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +
(2 * ClassSize.KEYVALUE_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
- /** Used for readability when we don't store memstore timestamp in HFile */
- public static final boolean NO_PERSISTENT_TS = false;
-
/*
* Calculate how the MemStore size has changed. Includes overhead of the
* backing Map.
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java?rev=1541880&r1=1541879&r2=1541880&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java Thu Nov 14 11:16:20 2013
@@ -40,7 +40,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
@@ -49,7 +48,6 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.KeyValueSkipListSet;
import org.apache.hadoop.hbase.regionserver.MemStore;
-import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.ClassSize;
import org.junit.BeforeClass;
@@ -305,14 +303,12 @@ public class TestHeapSize {
// MemStore Deep Overhead
actual = MemStore.DEEP_OVERHEAD;
expected = ClassSize.estimateBase(cl, false);
- expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
expected += ClassSize.estimateBase(AtomicLong.class, false);
expected += (2 * ClassSize.estimateBase(KeyValueSkipListSet.class, false));
expected += (2 * ClassSize.estimateBase(ConcurrentSkipListMap.class, false));
expected += (2 * ClassSize.estimateBase(TimeRangeTracker.class, false));
if(expected != actual) {
ClassSize.estimateBase(cl, true);
- ClassSize.estimateBase(ReentrantReadWriteLock.class, true);
ClassSize.estimateBase(AtomicLong.class, true);
ClassSize.estimateBase(KeyValueSkipListSet.class, true);
ClassSize.estimateBase(KeyValueSkipListSet.class, true);