You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2013/11/14 23:06:34 UTC

svn commit: r1542104 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/regionserver/MemStore.java main/java/org/apache/hadoop/hbase/regionserver/Store.java test/java/org/apache/hadoop/hbase/io/TestHeapSize.java

Author: larsh
Date: Thu Nov 14 22:06:34 2013
New Revision: 1542104

URL: http://svn.apache.org/r1542104
Log:
HBASE-9963 Remove the ReentrantReadWriteLock in the MemStore (Nicolas Liochon)

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1542104&r1=1542103&r2=1542104&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Thu Nov 14 22:06:34 2013
@@ -30,7 +30,6 @@ import java.util.List;
 import java.util.NavigableSet;
 import java.util.SortedSet;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -51,6 +50,10 @@ import org.apache.hadoop.hbase.util.Envi
  * to snapshot and is cleared.  We continue to serve edits out of new memstore
  * and backing snapshot until flusher reports in that the flush succeeded. At
  * this point we let the snapshot go.
+ * <p>
+ * The MemStore functions should not be called in parallel. Callers should hold
+ * write and read locks. This is done in {@link Store}.
+ * </p>
  * TODO: Adjust size of the memstore when we remove items because they have
  * been deleted.
  * TODO: With new KVSLS, need to make sure we update HeapSize with difference
@@ -75,8 +78,6 @@ public class MemStore implements HeapSiz
   // Snapshot of memstore.  Made for flusher.
   volatile KeyValueSkipListSet snapshot;
 
-  final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
   final KeyValue.KVComparator comparator;
 
   // Used comparing versions -- same r/c and ts but different type.
@@ -143,30 +144,25 @@ public class MemStore implements HeapSiz
    * To get the snapshot made by this method, use {@link #getSnapshot()}
    */
   void snapshot() {
-    this.lock.writeLock().lock();
-    try {
-      // If snapshot currently has entries, then flusher failed or didn't call
-      // cleanup.  Log a warning.
-      if (!this.snapshot.isEmpty()) {
-        LOG.warn("Snapshot called again without clearing previous. " +
-          "Doing nothing. Another ongoing flush or did we fail last attempt?");
-      } else {
-        if (!this.kvset.isEmpty()) {
-          this.snapshot = this.kvset;
-          this.kvset = new KeyValueSkipListSet(this.comparator);
-          this.snapshotTimeRangeTracker = this.timeRangeTracker;
-          this.timeRangeTracker = new TimeRangeTracker();
-          // Reset heap to not include any keys
-          this.size.set(DEEP_OVERHEAD);
-          // Reset allocator so we get a fresh buffer for the new memstore
-          if (allocator != null) {
-            this.allocator = new MemStoreLAB(conf);
-          }
-          timeOfOldestEdit = Long.MAX_VALUE;
+    // If snapshot currently has entries, then flusher failed or didn't call
+    // cleanup.  Log a warning.
+    if (!this.snapshot.isEmpty()) {
+      LOG.warn("Snapshot called again without clearing previous. " +
+        "Doing nothing. Another ongoing flush or did we fail last attempt?");
+    } else {
+      if (!this.kvset.isEmpty()) {
+        this.snapshot = this.kvset;
+        this.kvset = new KeyValueSkipListSet(this.comparator);
+        this.snapshotTimeRangeTracker = this.timeRangeTracker;
+        this.timeRangeTracker = new TimeRangeTracker();
+        // Reset heap to not include any keys
+        this.size.set(DEEP_OVERHEAD);
+        // Reset allocator so we get a fresh buffer for the new memstore
+        if (allocator != null) {
+          this.allocator = new MemStoreLAB(conf);
         }
+        timeOfOldestEdit = Long.MAX_VALUE;
       }
-    } finally {
-      this.lock.writeLock().unlock();
     }
   }
 
@@ -190,20 +186,15 @@ public class MemStore implements HeapSiz
    */
   void clearSnapshot(final SortedSet<KeyValue> ss)
   throws UnexpectedException {
-    this.lock.writeLock().lock();
-    try {
-      if (this.snapshot != ss) {
-        throw new UnexpectedException("Current snapshot is " +
-          this.snapshot + ", was passed " + ss);
-      }
-      // OK. Passed in snapshot is same as current snapshot.  If not-empty,
-      // create a new snapshot and let the old one go.
-      if (!ss.isEmpty()) {
-        this.snapshot = new KeyValueSkipListSet(this.comparator);
-        this.snapshotTimeRangeTracker = new TimeRangeTracker();
-      }
-    } finally {
-      this.lock.writeLock().unlock();
+    if (this.snapshot != ss) {
+      throw new UnexpectedException("Current snapshot is " +
+        this.snapshot + ", was passed " + ss);
+    }
+    // OK. Passed in snapshot is same as current snapshot.  If not-empty,
+    // create a new snapshot and let the old one go.
+    if (!ss.isEmpty()) {
+      this.snapshot = new KeyValueSkipListSet(this.comparator);
+      this.snapshotTimeRangeTracker = new TimeRangeTracker();
     }
   }
 
@@ -213,13 +204,8 @@ public class MemStore implements HeapSiz
    * @return approximate size of the passed key and value.
    */
   long add(final KeyValue kv) {
-    this.lock.readLock().lock();
-    try {
-      KeyValue toAdd = maybeCloneWithAllocator(kv);
-      return internalAdd(toAdd);
-    } finally {
-      this.lock.readLock().unlock();
-    }
+    KeyValue toAdd = maybeCloneWithAllocator(kv);
+    return internalAdd(toAdd);
   }
 
   long timeOfOldestEdit() {
@@ -285,26 +271,21 @@ public class MemStore implements HeapSiz
    * @param kv
    */
   void rollback(final KeyValue kv) {
-    this.lock.readLock().lock();
-    try {
-      // If the key is in the snapshot, delete it. We should not update
-      // this.size, because that tracks the size of only the memstore and
-      // not the snapshot. The flush of this snapshot to disk has not
-      // yet started because Store.flush() waits for all rwcc transactions to
-      // commit before starting the flush to disk.
-      KeyValue found = this.snapshot.get(kv);
-      if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
-        this.snapshot.remove(kv);
-      }
-      // If the key is in the memstore, delete it. Update this.size.
-      found = this.kvset.get(kv);
-      if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
-        removeFromKVSet(kv);
-        long s = heapSizeChange(kv, true);
-        this.size.addAndGet(-s);
-      }
-    } finally {
-      this.lock.readLock().unlock();
+    // If the key is in the snapshot, delete it. We should not update
+    // this.size, because that tracks the size of only the memstore and
+    // not the snapshot. The flush of this snapshot to disk has not
+    // yet started because Store.flush() waits for all rwcc transactions to
+    // commit before starting the flush to disk.
+    KeyValue found = this.snapshot.get(kv);
+    if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
+      this.snapshot.remove(kv);
+    }
+    // If the key is in the memstore, delete it. Update this.size.
+    found = this.kvset.get(kv);
+    if (found != null && found.getMemstoreTS() == kv.getMemstoreTS()) {
+      removeFromKVSet(kv);
+      long s = heapSizeChange(kv, true);
+      this.size.addAndGet(-s);
     }
   }
 
@@ -314,15 +295,9 @@ public class MemStore implements HeapSiz
    * @return approximate size of the passed key and value.
    */
   long delete(final KeyValue delete) {
-    long s = 0;
-    this.lock.readLock().lock();
-    try {
-      KeyValue toAdd = maybeCloneWithAllocator(delete);
-      s += heapSizeChange(toAdd, addToKVSet(toAdd));
-      timeRangeTracker.includeTimestamp(toAdd);
-    } finally {
-      this.lock.readLock().unlock();
-    }
+    KeyValue toAdd = maybeCloneWithAllocator(delete);
+    long s = heapSizeChange(toAdd, addToKVSet(toAdd));
+    timeRangeTracker.includeTimestamp(toAdd);
     this.size.addAndGet(s);
     return s;
   }
@@ -333,12 +308,7 @@ public class MemStore implements HeapSiz
    * @return Next row or null if none found.
    */
   KeyValue getNextRow(final KeyValue kv) {
-    this.lock.readLock().lock();
-    try {
-      return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
-    } finally {
-      this.lock.readLock().unlock();
-    }
+    return getLowest(getNextRow(kv, this.kvset), getNextRow(kv, this.snapshot));
   }
 
   /*
@@ -382,13 +352,8 @@ public class MemStore implements HeapSiz
    * @param state column/delete tracking state
    */
   void getRowKeyAtOrBefore(final GetClosestRowBeforeTracker state) {
-    this.lock.readLock().lock();
-    try {
-      getRowKeyAtOrBefore(kvset, state);
-      getRowKeyAtOrBefore(snapshot, state);
-    } finally {
-      this.lock.readLock().unlock();
-    }
+    getRowKeyAtOrBefore(kvset, state);
+    getRowKeyAtOrBefore(snapshot, state);
   }
 
   /*
@@ -480,54 +445,49 @@ public class MemStore implements HeapSiz
                                 byte[] qualifier,
                                 long newValue,
                                 long now) {
-   this.lock.readLock().lock();
-    try {
-      KeyValue firstKv = KeyValue.createFirstOnRow(
-          row, family, qualifier);
-      // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
-      SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
-      if (!snSs.isEmpty()) {
-        KeyValue snKv = snSs.first();
-        // is there a matching KV in the snapshot?
-        if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
-          if (snKv.getTimestamp() == now) {
-            // poop,
-            now += 1;
-          }
+    KeyValue firstKv = KeyValue.createFirstOnRow(
+        row, family, qualifier);
+    // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit.
+    SortedSet<KeyValue> snSs = snapshot.tailSet(firstKv);
+    if (!snSs.isEmpty()) {
+      KeyValue snKv = snSs.first();
+      // is there a matching KV in the snapshot?
+      if (snKv.matchingRow(firstKv) && snKv.matchingQualifier(firstKv)) {
+        if (snKv.getTimestamp() == now) {
+          // poop,
+          now += 1;
         }
       }
+    }
 
-      // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
-      // But the timestamp should also be max(now, mostRecentTsInMemstore)
+    // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary.
+    // But the timestamp should also be max(now, mostRecentTsInMemstore)
 
-      // so we cant add the new KV w/o knowing what's there already, but we also
-      // want to take this chance to delete some kvs. So two loops (sad)
+    // so we cant add the new KV w/o knowing what's there already, but we also
+    // want to take this chance to delete some kvs. So two loops (sad)
 
-      SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
-      Iterator<KeyValue> it = ss.iterator();
-      while ( it.hasNext() ) {
-        KeyValue kv = it.next();
-
-        // if this isnt the row we are interested in, then bail:
-        if (!kv.matchingColumn(family,qualifier) || !kv.matchingRow(firstKv) ) {
-          break; // rows dont match, bail.
-        }
+    SortedSet<KeyValue> ss = kvset.tailSet(firstKv);
+    Iterator<KeyValue> it = ss.iterator();
+    while ( it.hasNext() ) {
+      KeyValue kv = it.next();
 
-        // if the qualifier matches and it's a put, just RM it out of the kvset.
-        if (kv.getType() == KeyValue.Type.Put.getCode() &&
-            kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
-          now = kv.getTimestamp();
-        }
+      // if this isnt the row we are interested in, then bail:
+      if (!kv.matchingColumn(family,qualifier) || !kv.matchingRow(firstKv) ) {
+        break; // rows dont match, bail.
       }
 
-      // create or update (upsert) a new KeyValue with
-      // 'now' and a 0 memstoreTS == immediately visible
-      return upsert(Arrays.asList(
-          new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)))
-      );
-    } finally {
-      this.lock.readLock().unlock();
+      // if the qualifier matches and it's a put, just RM it out of the kvset.
+      if (kv.getType() == KeyValue.Type.Put.getCode() &&
+          kv.getTimestamp() > now && firstKv.matchingQualifier(kv)) {
+        now = kv.getTimestamp();
+      }
     }
+
+    // create or update (upsert) a new KeyValue with
+    // 'now' and a 0 memstoreTS == immediately visible
+    return upsert(Arrays.asList(
+        new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue)))
+    );
   }
 
   /**
@@ -548,17 +508,12 @@ public class MemStore implements HeapSiz
    * @return change in memstore size
    */
   public long upsert(List<KeyValue> kvs) {
-   this.lock.readLock().lock();
-    try {
-      long size = 0;
-      for (KeyValue kv : kvs) {
-        kv.setMemstoreTS(0);
-        size += upsert(kv);
-      }
-      return size;
-    } finally {
-      this.lock.readLock().unlock();
+    long size = 0;
+    for (KeyValue kv : kvs) {
+      kv.setMemstoreTS(0);
+      size += upsert(kv);
     }
+    return size;
   }
 
   /**
@@ -665,13 +620,8 @@ public class MemStore implements HeapSiz
    * @return scanner on memstore and snapshot in this order.
    */
   List<KeyValueScanner> getScanners() {
-    this.lock.readLock().lock();
-    try {
-      return Collections.<KeyValueScanner>singletonList(
-          new MemStoreScanner(MultiVersionConsistencyControl.getThreadReadPoint()));
-    } finally {
-      this.lock.readLock().unlock();
-    }
+    return Collections.<KeyValueScanner>singletonList(
+        new MemStoreScanner(MultiVersionConsistencyControl.getThreadReadPoint()));
   }
 
   /**
@@ -716,7 +666,7 @@ public class MemStore implements HeapSiz
 
     // the pre-calculated KeyValue to be returned by peek() or next()
     private KeyValue theNext;
-    private long readPoint;
+    private final long readPoint;
 
     /*
     Some notes...
@@ -930,7 +880,7 @@ public class MemStore implements HeapSiz
       ClassSize.OBJECT + (11 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
 
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
-      ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
+      ClassSize.ATOMIC_LONG +
       (2 * ClassSize.TIMERANGE_TRACKER) +
       (2 * ClassSize.KEYVALUE_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP));
 

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1542104&r1=1542103&r2=1542104&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Nov 14 22:06:34 2013
@@ -32,7 +32,6 @@ import java.util.SortedSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Future;
@@ -758,7 +757,12 @@ public class Store extends SchemaConfigu
    * {@link #flushCache(long, SortedSet<KeyValue>)} so it has some work to do.
    */
   void snapshot() {
-    this.memstore.snapshot();
+    this.lock.writeLock().lock();
+    try {
+      this.memstore.snapshot();
+    } finally {
+      this.lock.writeLock().unlock();
+    }
   }
 
   /**

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java?rev=1542104&r1=1542103&r2=1542104&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java Thu Nov 14 22:06:34 2013
@@ -298,14 +298,12 @@ public class TestHeapSize extends TestCa
     // MemStore Deep Overhead
     actual = MemStore.DEEP_OVERHEAD;
     expected = ClassSize.estimateBase(cl, false);
-    expected += ClassSize.estimateBase(ReentrantReadWriteLock.class, false);
     expected += ClassSize.estimateBase(AtomicLong.class, false);
     expected += (2 * ClassSize.estimateBase(KeyValueSkipListSet.class, false));
     expected += (2 * ClassSize.estimateBase(ConcurrentSkipListMap.class, false));
     expected += (2 * ClassSize.estimateBase(TimeRangeTracker.class, false));
     if(expected != actual) {
       ClassSize.estimateBase(cl, true);
-      ClassSize.estimateBase(ReentrantReadWriteLock.class, true);
       ClassSize.estimateBase(AtomicLong.class, true);
       ClassSize.estimateBase(KeyValueSkipListSet.class, true);
       ClassSize.estimateBase(KeyValueSkipListSet.class, true);