You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2014/03/25 10:47:29 UTC

svn commit: r1581287 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/

Author: anoopsamjohn
Date: Tue Mar 25 09:47:28 2014
New Revision: 1581287

URL: http://svn.apache.org/r1581287
Log:
HBASE-10750 Pluggable MemStoreLAB.(Anoop)

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java?rev=1581287&r1=1581286&r2=1581287&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java Tue Mar 25 09:47:28 2014
@@ -40,11 +40,12 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
+import org.apache.hadoop.hbase.util.ByteRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.CollectionBackedScanner;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 
 /**
  * The MemStore holds in-memory modifications to the Store.  Modifications
@@ -65,10 +66,9 @@ import org.apache.hadoop.hbase.util.Envi
 @InterfaceAudience.Private
 public class DefaultMemStore implements MemStore {
   private static final Log LOG = LogFactory.getLog(DefaultMemStore.class);
-
-  static final String USEMSLAB_KEY =
-    "hbase.hregion.memstore.mslab.enabled";
+  static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
   private static final boolean USEMSLAB_DEFAULT = true;
+  static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class";
 
   private Configuration conf;
 
@@ -94,7 +94,6 @@ public class DefaultMemStore implements 
   TimeRangeTracker timeRangeTracker;
   TimeRangeTracker snapshotTimeRangeTracker;
 
-  MemStoreChunkPool chunkPool;
   volatile MemStoreLAB allocator;
   volatile MemStoreLAB snapshotAllocator;
   volatile long snapshotId;
@@ -121,11 +120,11 @@ public class DefaultMemStore implements 
     this.size = new AtomicLong(DEEP_OVERHEAD);
     this.snapshotSize = 0;
     if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) {
-      this.chunkPool = MemStoreChunkPool.getPool(conf);
-      this.allocator = new MemStoreLAB(conf, chunkPool);
+      String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
+      this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
+          new Class[] { Configuration.class }, new Object[] { conf });
     } else {
       this.allocator = null;
-      this.chunkPool = null;
     }
   }
 
@@ -162,7 +161,9 @@ public class DefaultMemStore implements 
         this.snapshotAllocator = this.allocator;
         // Reset allocator so we get a fresh buffer for the new memstore
         if (allocator != null) {
-          this.allocator = new MemStoreLAB(conf, chunkPool);
+          String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName());
+          this.allocator = ReflectionUtils.instantiateWithCustomCtor(className,
+              new Class[] { Configuration.class }, new Object[] { conf });
         }
         timeOfOldestEdit = Long.MAX_VALUE;
       }
@@ -258,15 +259,15 @@ public class DefaultMemStore implements 
     }
 
     int len = kv.getLength();
-    Allocation alloc = allocator.allocateBytes(len);
+    ByteRange alloc = allocator.allocateBytes(len);
     if (alloc == null) {
       // The allocation was too large, allocator decided
       // not to do anything with it.
       return kv;
     }
-    assert alloc.getData() != null;
-    System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
-    KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
+    assert alloc.getBytes() != null;
+    alloc.put(0, kv.getBuffer(), kv.getOffset(), len);
+    KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len);
     newKv.setMvccVersion(kv.getMvccVersion());
     return newKv;
   }
@@ -987,7 +988,7 @@ public class DefaultMemStore implements 
   }
 
   public final static long FIXED_OVERHEAD = ClassSize.align(
-      ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG));
+      ClassSize.OBJECT + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG));
 
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) +

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java?rev=1581287&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemStoreLAB.java Tue Mar 25 09:47:28 2014
@@ -0,0 +1,315 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.ByteRange;
+import org.apache.hadoop.hbase.util.SimpleByteRange;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A memstore-local allocation buffer.
+ * <p>
+ * The MemStoreLAB is basically a bump-the-pointer allocator that allocates
+ * big (2MB) byte[] chunks from and then doles it out to threads that request
+ * slices into the array.
+ * <p>
+ * The purpose of this class is to combat heap fragmentation in the
+ * regionserver. By ensuring that all KeyValues in a given memstore refer
+ * only to large chunks of contiguous memory, we ensure that large blocks
+ * get freed up when the memstore is flushed.
+ * <p>
+ * Without the MSLAB, the byte array allocated during insertion end up
+ * interleaved throughout the heap, and the old generation gets progressively
+ * more fragmented until a stop-the-world compacting collection occurs.
+ * <p>
+ * TODO: we should probably benchmark whether word-aligning the allocations
+ * would provide a performance improvement - probably would speed up the
+ * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
+ * anyway
+ */
+@InterfaceAudience.Private
+public class HeapMemStoreLAB implements MemStoreLAB {
+
+  static final String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
+  static final int CHUNK_SIZE_DEFAULT = 2048 * 1024;
+  static final String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
+  static final int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through
+                                                   // allocator
+
+  private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
+  // A queue of chunks contained by this memstore
+  private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>();
+  final int chunkSize;
+  final int maxAlloc;
+  private final MemStoreChunkPool chunkPool;
+
+  // This flag is for closing this instance, its set when clearing snapshot of
+  // memstore
+  private volatile boolean closed = false;
+  // This flag is for reclaiming chunks. Its set when putting chunks back to
+  // pool
+  private AtomicBoolean reclaimed = new AtomicBoolean(false);
+  // Current count of open scanners which reading data from this MemStoreLAB
+  private final AtomicInteger openScannerCount = new AtomicInteger();
+
+  // Used in testing
+  public HeapMemStoreLAB() {
+    this(new Configuration());
+  }
+
+  public HeapMemStoreLAB(Configuration conf) {
+    chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
+    maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
+    this.chunkPool = MemStoreChunkPool.getPool(conf);
+
+    // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
+    Preconditions.checkArgument(
+      maxAlloc <= chunkSize,
+      MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
+  }
+
+  /**
+   * Allocate a slice of the given length.
+   *
+   * If the size is larger than the maximum size specified for this
+   * allocator, returns null.
+   */
+  @Override
+  public ByteRange allocateBytes(int size) {
+    Preconditions.checkArgument(size >= 0, "negative size");
+
+    // Callers should satisfy large allocations directly from JVM since they
+    // don't cause fragmentation as badly.
+    if (size > maxAlloc) {
+      return null;
+    }
+
+    while (true) {
+      Chunk c = getOrMakeChunk();
+
+      // Try to allocate from this chunk
+      int allocOffset = c.alloc(size);
+      if (allocOffset != -1) {
+        // We succeeded - this is the common case - small alloc
+        // from a big buffer
+        return new SimpleByteRange(c.data, allocOffset, size);
+      }
+
+      // not enough space!
+      // try to retire this chunk
+      tryRetireChunk(c);
+    }
+  }
+
+  /**
+   * Close this instance since it won't be used any more, try to put the chunks
+   * back to pool
+   */
+  @Override
+  public void close() {
+    this.closed = true;
+    // We could put back the chunks to pool for reusing only when there is no
+    // opening scanner which will read their data
+    if (chunkPool != null && openScannerCount.get() == 0
+        && reclaimed.compareAndSet(false, true)) {
+      chunkPool.putbackChunks(this.chunkQueue);
+    }
+  }
+
+  /**
+   * Called when opening a scanner on the data of this MemStoreLAB
+   */
+  @Override
+  public void incScannerCount() {
+    this.openScannerCount.incrementAndGet();
+  }
+
+  /**
+   * Called when closing a scanner on the data of this MemStoreLAB
+   */
+  @Override
+  public void decScannerCount() {
+    int count = this.openScannerCount.decrementAndGet();
+    if (chunkPool != null && count == 0 && this.closed
+        && reclaimed.compareAndSet(false, true)) {
+      chunkPool.putbackChunks(this.chunkQueue);
+    }
+  }
+
+  /**
+   * Try to retire the current chunk if it is still
+   * <code>c</code>. Postcondition is that curChunk.get()
+   * != c
+   */
+  private void tryRetireChunk(Chunk c) {
+    curChunk.compareAndSet(c, null);
+    // If the CAS succeeds, that means that we won the race
+    // to retire the chunk. We could use this opportunity to
+    // update metrics on external fragmentation.
+    //
+    // If the CAS fails, that means that someone else already
+    // retired the chunk for us.
+  }
+
+  /**
+   * Get the current chunk, or, if there is no current chunk,
+   * allocate a new one from the JVM.
+   */
+  private Chunk getOrMakeChunk() {
+    while (true) {
+      // Try to get the chunk
+      Chunk c = curChunk.get();
+      if (c != null) {
+        return c;
+      }
+
+      // No current chunk, so we want to allocate one. We race
+      // against other allocators to CAS in an uninitialized chunk
+      // (which is cheap to allocate)
+      c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
+      if (curChunk.compareAndSet(null, c)) {
+        // we won race - now we need to actually do the expensive
+        // allocation step
+        c.init();
+        this.chunkQueue.add(c);
+        return c;
+      } else if (chunkPool != null) {
+        chunkPool.putbackChunk(c);
+      }
+      // someone else won race - that's fine, we'll try to grab theirs
+      // in the next iteration of the loop.
+    }
+  }
+
+  /**
+   * A chunk of memory out of which allocations are sliced.
+   */
+  static class Chunk {
+    /** Actual underlying data */
+    private byte[] data;
+
+    private static final int UNINITIALIZED = -1;
+    private static final int OOM = -2;
+    /**
+     * Offset for the next allocation, or the sentinel value -1
+     * which implies that the chunk is still uninitialized.
+     * */
+    private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
+
+    /** Total number of allocations satisfied from this buffer */
+    private AtomicInteger allocCount = new AtomicInteger();
+
+    /** Size of chunk in bytes */
+    private final int size;
+
+    /**
+     * Create an uninitialized chunk. Note that memory is not allocated yet, so
+     * this is cheap.
+     * @param size in bytes
+     */
+    Chunk(int size) {
+      this.size = size;
+    }
+
+    /**
+     * Actually claim the memory for this chunk. This should only be called from
+     * the thread that constructed the chunk. It is thread-safe against other
+     * threads calling alloc(), who will block until the allocation is complete.
+     */
+    public void init() {
+      assert nextFreeOffset.get() == UNINITIALIZED;
+      try {
+        if (data == null) {
+          data = new byte[size];
+        }
+      } catch (OutOfMemoryError e) {
+        boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
+        assert failInit; // should be true.
+        throw e;
+      }
+      // Mark that it's ready for use
+      boolean initted = nextFreeOffset.compareAndSet(
+          UNINITIALIZED, 0);
+      // We should always succeed the above CAS since only one thread
+      // calls init()!
+      Preconditions.checkState(initted,
+          "Multiple threads tried to init same chunk");
+    }
+
+    /**
+     * Reset the offset to UNINITIALIZED before before reusing an old chunk
+     */
+    void reset() {
+      if (nextFreeOffset.get() != UNINITIALIZED) {
+        nextFreeOffset.set(UNINITIALIZED);
+        allocCount.set(0);
+      }
+    }
+
+    /**
+     * Try to allocate <code>size</code> bytes from the chunk.
+     * @return the offset of the successful allocation, or -1 to indicate not-enough-space
+     */
+    public int alloc(int size) {
+      while (true) {
+        int oldOffset = nextFreeOffset.get();
+        if (oldOffset == UNINITIALIZED) {
+          // The chunk doesn't have its data allocated yet.
+          // Since we found this in curChunk, we know that whoever
+          // CAS-ed it there is allocating it right now. So spin-loop
+          // shouldn't spin long!
+          Thread.yield();
+          continue;
+        }
+        if (oldOffset == OOM) {
+          // doh we ran out of ram. return -1 to chuck this away.
+          return -1;
+        }
+
+        if (oldOffset + size > data.length) {
+          return -1; // alloc doesn't fit
+        }
+
+        // Try to atomically claim this chunk
+        if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
+          // we got the alloc
+          allocCount.incrementAndGet();
+          return oldOffset;
+        }
+        // we raced and lost alloc, try again
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "Chunk@" + System.identityHashCode(this) +
+        " allocs=" + allocCount.get() + "waste=" +
+        (data.length - nextFreeOffset.get());
+    }
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java?rev=1581287&r1=1581286&r2=1581287&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java Tue Mar 25 09:47:28 2014
@@ -30,13 +30,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Chunk;
+import org.apache.hadoop.hbase.regionserver.HeapMemStoreLAB.Chunk;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
- * A pool of {@link MemStoreLAB$Chunk} instances.
+ * A pool of {@link HeapMemStoreLAB$Chunk} instances.
  * 
  * MemStoreChunkPool caches a number of retired chunks for reusing, it could
  * decrease allocating bytes when writing, thereby optimizing the garbage
@@ -177,42 +177,39 @@ public class MemStoreChunkPool {
    * @param conf
    * @return the global MemStoreChunkPool instance
    */
-  static synchronized MemStoreChunkPool getPool(Configuration conf) {
+  static MemStoreChunkPool getPool(Configuration conf) {
     if (globalInstance != null) return globalInstance;
     if (chunkPoolDisabled) return null;
 
-
-    float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY,
-        POOL_MAX_SIZE_DEFAULT);
-    if (poolSizePercentage <= 0) {
-      chunkPoolDisabled = true;
-      return null;
-    }
-    if (poolSizePercentage > 1.0) {
-      throw new IllegalArgumentException(CHUNK_POOL_MAXSIZE_KEY
-          + " must be between 0.0 and 1.0");
-    }
-    long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
-        .getMax();
-    long globalMemStoreLimit = (long) (heapMax * MemStoreFlusher.getGlobalMemStorePercent(conf));
-    int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY,
-        MemStoreLAB.CHUNK_SIZE_DEFAULT);
-    int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize);
-
-    float initialCountPercentage = conf.getFloat(CHUNK_POOL_INITIALSIZE_KEY,
-        POOL_INITIAL_SIZE_DEFAULT);
-    if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
-      throw new IllegalArgumentException(CHUNK_POOL_INITIALSIZE_KEY
-          + " must be between 0.0 and 1.0");
-    }
-
-    int initialCount = (int) (initialCountPercentage * maxCount);
-    LOG.info("Allocating MemStoreChunkPool with chunk size "
-        + StringUtils.byteDesc(chunkSize) + ", max count " + maxCount
-        + ", initial count " + initialCount);
-    globalInstance = new MemStoreChunkPool(conf, chunkSize, maxCount,
-        initialCount);
-    return globalInstance;
+    synchronized (MemStoreChunkPool.class) {
+      if (globalInstance != null) return globalInstance;
+      float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY, POOL_MAX_SIZE_DEFAULT);
+      if (poolSizePercentage <= 0) {
+        chunkPoolDisabled = true;
+        return null;
+      }
+      if (poolSizePercentage > 1.0) {
+        throw new IllegalArgumentException(CHUNK_POOL_MAXSIZE_KEY + " must be between 0.0 and 1.0");
+      }
+      long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
+      long globalMemStoreLimit = (long) (heapMax * MemStoreFlusher.getGlobalMemStorePercent(conf));
+      int chunkSize = conf.getInt(HeapMemStoreLAB.CHUNK_SIZE_KEY,
+          HeapMemStoreLAB.CHUNK_SIZE_DEFAULT);
+      int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize);
+
+      float initialCountPercentage = conf.getFloat(CHUNK_POOL_INITIALSIZE_KEY,
+          POOL_INITIAL_SIZE_DEFAULT);
+      if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
+        throw new IllegalArgumentException(CHUNK_POOL_INITIALSIZE_KEY
+            + " must be between 0.0 and 1.0");
+      }
+
+      int initialCount = (int) (initialCountPercentage * maxCount);
+      LOG.info("Allocating MemStoreChunkPool with chunk size " + StringUtils.byteDesc(chunkSize)
+          + ", max count " + maxCount + ", initial count " + initialCount);
+      globalInstance = new MemStoreChunkPool(conf, chunkSize, maxCount, initialCount);
+      return globalInstance;
+    }
   }
 
 }
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java?rev=1581287&r1=1581286&r2=1581287&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java Tue Mar 25 09:47:28 2014
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,325 +17,47 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.util.ByteRange;
 
 /**
  * A memstore-local allocation buffer.
  * <p>
- * The MemStoreLAB is basically a bump-the-pointer allocator that allocates
- * big (2MB) byte[] chunks from and then doles it out to threads that request
- * slices into the array.
+ * The MemStoreLAB is basically a bump-the-pointer allocator that allocates big (2MB) chunks from
+ * and then doles it out to threads that request slices into the array.
  * <p>
- * The purpose of this class is to combat heap fragmentation in the
- * regionserver. By ensuring that all KeyValues in a given memstore refer
- * only to large chunks of contiguous memory, we ensure that large blocks
- * get freed up when the memstore is flushed.
+ * The purpose of this is to combat heap fragmentation in the regionserver. By ensuring that all
+ * KeyValues in a given memstore refer only to large chunks of contiguous memory, we ensure that
+ * large blocks get freed up when the memstore is flushed.
  * <p>
- * Without the MSLAB, the byte array allocated during insertion end up
- * interleaved throughout the heap, and the old generation gets progressively
- * more fragmented until a stop-the-world compacting collection occurs.
+ * Without the MSLAB, the byte array allocated during insertion end up interleaved throughout the
+ * heap, and the old generation gets progressively more fragmented until a stop-the-world compacting
+ * collection occurs.
  * <p>
- * TODO: we should probably benchmark whether word-aligning the allocations
- * would provide a performance improvement - probably would speed up the
- * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
- * anyway
  */
 @InterfaceAudience.Private
-public class MemStoreLAB {
-  private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
-  // A queue of chunks contained by this memstore
-  private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>();
-
-  final static String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
-  final static int CHUNK_SIZE_DEFAULT = 2048 * 1024;
-  final int chunkSize;
-
-  final static String MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
-  final static int MAX_ALLOC_DEFAULT = 256  * 1024; // allocs bigger than this don't go through allocator
-  final int maxAlloc;
-
-  private final MemStoreChunkPool chunkPool;
-
-  // This flag is for closing this instance, its set when clearing snapshot of
-  // memstore
-  private volatile boolean closed = false;
-  // This flag is for reclaiming chunks. Its set when putting chunks back to
-  // pool
-  private AtomicBoolean reclaimed = new AtomicBoolean(false);
-  // Current count of open scanners which reading data from this MemStoreLAB
-  private final AtomicInteger openScannerCount = new AtomicInteger();
-
-  // Used in testing
-  public MemStoreLAB() {
-    this(new Configuration());
-  }
-
-  private MemStoreLAB(Configuration conf) {
-    this(conf, MemStoreChunkPool.getPool(conf));
-  }
-
-  public MemStoreLAB(Configuration conf, MemStoreChunkPool pool) {
-    chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT);
-    maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT);
-    this.chunkPool = pool;
-
-    // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
-    Preconditions.checkArgument(
-      maxAlloc <= chunkSize,
-      MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY);
-  }
+public interface MemStoreLAB {
 
   /**
-   * Allocate a slice of the given length.
-   *
-   * If the size is larger than the maximum size specified for this
-   * allocator, returns null.
+   * Allocate a slice of the given length. If the size is larger than the maximum size specified for
+   * this allocator, returns null.
+   * @param size
+   * @return {@link ByteRange}
    */
-  public Allocation allocateBytes(int size) {
-    Preconditions.checkArgument(size >= 0, "negative size");
-
-    // Callers should satisfy large allocations directly from JVM since they
-    // don't cause fragmentation as badly.
-    if (size > maxAlloc) {
-      return null;
-    }
-
-    while (true) {
-      Chunk c = getOrMakeChunk();
-
-      // Try to allocate from this chunk
-      int allocOffset = c.alloc(size);
-      if (allocOffset != -1) {
-        // We succeeded - this is the common case - small alloc
-        // from a big buffer
-        return new Allocation(c.data, allocOffset);
-      }
-
-      // not enough space!
-      // try to retire this chunk
-      tryRetireChunk(c);
-    }
-  }
+  ByteRange allocateBytes(int size);
 
   /**
-   * Close this instance since it won't be used any more, try to put the chunks
-   * back to pool
+   * Close instance since it won't be used any more, try to put the chunks back to pool
    */
-  void close() {
-    this.closed = true;
-    // We could put back the chunks to pool for reusing only when there is no
-    // opening scanner which will read their data
-    if (chunkPool != null && openScannerCount.get() == 0
-        && reclaimed.compareAndSet(false, true)) {
-      chunkPool.putbackChunks(this.chunkQueue);
-    }
-  }
+  void close();
 
   /**
    * Called when opening a scanner on the data of this MemStoreLAB
    */
-  void incScannerCount() {
-    this.openScannerCount.incrementAndGet();
-  }
+  void incScannerCount();
 
   /**
    * Called when closing a scanner on the data of this MemStoreLAB
    */
-  void decScannerCount() {
-    int count = this.openScannerCount.decrementAndGet();
-    if (chunkPool != null && count == 0 && this.closed
-        && reclaimed.compareAndSet(false, true)) {
-      chunkPool.putbackChunks(this.chunkQueue);
-    }
-  }
-
-  /**
-   * Try to retire the current chunk if it is still
-   * <code>c</code>. Postcondition is that curChunk.get()
-   * != c
-   */
-  private void tryRetireChunk(Chunk c) {
-    curChunk.compareAndSet(c, null);
-    // If the CAS succeeds, that means that we won the race
-    // to retire the chunk. We could use this opportunity to
-    // update metrics on external fragmentation.
-    //
-    // If the CAS fails, that means that someone else already
-    // retired the chunk for us.
-  }
-
-  /**
-   * Get the current chunk, or, if there is no current chunk,
-   * allocate a new one from the JVM.
-   */
-  private Chunk getOrMakeChunk() {
-    while (true) {
-      // Try to get the chunk
-      Chunk c = curChunk.get();
-      if (c != null) {
-        return c;
-      }
-
-      // No current chunk, so we want to allocate one. We race
-      // against other allocators to CAS in an uninitialized chunk
-      // (which is cheap to allocate)
-      c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
-      if (curChunk.compareAndSet(null, c)) {
-        // we won race - now we need to actually do the expensive
-        // allocation step
-        c.init();
-        this.chunkQueue.add(c);
-        return c;
-      } else if (chunkPool != null) {
-        chunkPool.putbackChunk(c);
-      }
-      // someone else won race - that's fine, we'll try to grab theirs
-      // in the next iteration of the loop.
-    }
-  }
-
-  /**
-   * A chunk of memory out of which allocations are sliced.
-   */
-  static class Chunk {
-    /** Actual underlying data */
-    private byte[] data;
-
-    private static final int UNINITIALIZED = -1;
-    private static final int OOM = -2;
-    /**
-     * Offset for the next allocation, or the sentinel value -1
-     * which implies that the chunk is still uninitialized.
-     * */
-    private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
-
-    /** Total number of allocations satisfied from this buffer */
-    private AtomicInteger allocCount = new AtomicInteger();
-
-    /** Size of chunk in bytes */
-    private final int size;
-
-    /**
-     * Create an uninitialized chunk. Note that memory is not allocated yet, so
-     * this is cheap.
-     * @param size in bytes
-     */
-    Chunk(int size) {
-      this.size = size;
-    }
-
-    /**
-     * Actually claim the memory for this chunk. This should only be called from
-     * the thread that constructed the chunk. It is thread-safe against other
-     * threads calling alloc(), who will block until the allocation is complete.
-     */
-    public void init() {
-      assert nextFreeOffset.get() == UNINITIALIZED;
-      try {
-        if (data == null) {
-          data = new byte[size];
-        }
-      } catch (OutOfMemoryError e) {
-        boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
-        assert failInit; // should be true.
-        throw e;
-      }
-      // Mark that it's ready for use
-      boolean initted = nextFreeOffset.compareAndSet(
-          UNINITIALIZED, 0);
-      // We should always succeed the above CAS since only one thread
-      // calls init()!
-      Preconditions.checkState(initted,
-          "Multiple threads tried to init same chunk");
-    }
-
-    /**
-     * Reset the offset to UNINITIALIZED before before reusing an old chunk
-     */
-    void reset() {
-      if (nextFreeOffset.get() != UNINITIALIZED) {
-        nextFreeOffset.set(UNINITIALIZED);
-        allocCount.set(0);
-      }
-    }
-
-    /**
-     * Try to allocate <code>size</code> bytes from the chunk.
-     * @return the offset of the successful allocation, or -1 to indicate not-enough-space
-     */
-    public int alloc(int size) {
-      while (true) {
-        int oldOffset = nextFreeOffset.get();
-        if (oldOffset == UNINITIALIZED) {
-          // The chunk doesn't have its data allocated yet.
-          // Since we found this in curChunk, we know that whoever
-          // CAS-ed it there is allocating it right now. So spin-loop
-          // shouldn't spin long!
-          Thread.yield();
-          continue;
-        }
-        if (oldOffset == OOM) {
-          // doh we ran out of ram. return -1 to chuck this away.
-          return -1;
-        }
-
-        if (oldOffset + size > data.length) {
-          return -1; // alloc doesn't fit
-        }
-
-        // Try to atomically claim this chunk
-        if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
-          // we got the alloc
-          allocCount.incrementAndGet();
-          return oldOffset;
-        }
-        // we raced and lost alloc, try again
-      }
-    }
-
-    @Override
-    public String toString() {
-      return "Chunk@" + System.identityHashCode(this) +
-        " allocs=" + allocCount.get() + "waste=" +
-        (data.length - nextFreeOffset.get());
-    }
-  }
-
-  /**
-   * The result of a single allocation. Contains the chunk that the
-   * allocation points into, and the offset in this array where the
-   * slice begins.
-   */
-  public static class Allocation {
-    private final byte[] data;
-    private final int offset;
-
-    private Allocation(byte[] data, int off) {
-      this.data = data;
-      this.offset = off;
-    }
-
-    @Override
-    public String toString() {
-      return "Allocation(" + "capacity=" + data.length + ", off=" + offset
-          + ")";
-    }
-
-    byte[] getData() {
-      return data;
-    }
-
-    int getOffset() {
-      return offset;
-    }
-  }
+  void decScannerCount();
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java?rev=1581287&r1=1581286&r2=1581287&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java Tue Mar 25 09:47:28 2014
@@ -28,7 +28,7 @@ import java.util.Random;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.SmallTests;
-import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
+import org.apache.hadoop.hbase.util.ByteRange;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -68,21 +68,21 @@ public class TestMemStoreChunkPool {
   @Test
   public void testReusingChunks() {
     Random rand = new Random();
-    MemStoreLAB mslab = new MemStoreLAB(conf, chunkPool);
+    MemStoreLAB mslab = new HeapMemStoreLAB(conf);
     int expectedOff = 0;
     byte[] lastBuffer = null;
     // Randomly allocate some bytes
     for (int i = 0; i < 100; i++) {
       int size = rand.nextInt(1000);
-      Allocation alloc = mslab.allocateBytes(size);
+      ByteRange alloc = mslab.allocateBytes(size);
 
-      if (alloc.getData() != lastBuffer) {
+      if (alloc.getBytes() != lastBuffer) {
         expectedOff = 0;
-        lastBuffer = alloc.getData();
+        lastBuffer = alloc.getBytes();
       }
       assertEquals(expectedOff, alloc.getOffset());
-      assertTrue("Allocation " + alloc + " overruns buffer", alloc.getOffset()
-          + size <= alloc.getData().length);
+      assertTrue("Allocation overruns buffer", alloc.getOffset()
+          + size <= alloc.getBytes().length);
       expectedOff += size;
     }
     // chunks will be put back to pool after close
@@ -90,7 +90,7 @@ public class TestMemStoreChunkPool {
     int chunkCount = chunkPool.getPoolSize();
     assertTrue(chunkCount > 0);
     // reconstruct mslab
-    mslab = new MemStoreLAB(conf, chunkPool);
+    mslab = new HeapMemStoreLAB(conf);
     // chunk should be got from the pool, so we can reuse it.
     mslab.allocateBytes(1000);
     assertEquals(chunkCount - 1, chunkPool.getPoolSize());

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java?rev=1581287&r1=1581286&r2=1581287&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java Tue Mar 25 09:47:28 2014
@@ -29,7 +29,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.MultithreadedTestUtil;
 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
 import org.apache.hadoop.hbase.SmallTests;
-import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
+import org.apache.hadoop.hbase.util.ByteRange;
 import org.junit.Test;
 
 import com.google.common.collect.Iterables;
@@ -47,7 +47,7 @@ public class TestMemStoreLAB {
   @Test
   public void testLABRandomAllocation() {
     Random rand = new Random();
-    MemStoreLAB mslab = new MemStoreLAB();
+    MemStoreLAB mslab = new HeapMemStoreLAB();
     int expectedOff = 0;
     byte[] lastBuffer = null;
     // 100K iterations by 0-1K alloc -> 50MB expected
@@ -55,23 +55,23 @@ public class TestMemStoreLAB {
     // behavior
     for (int i = 0; i < 100000; i++) {
       int size = rand.nextInt(1000);
-      Allocation alloc = mslab.allocateBytes(size);
+      ByteRange alloc = mslab.allocateBytes(size);
       
-      if (alloc.getData() != lastBuffer) {
+      if (alloc.getBytes() != lastBuffer) {
         expectedOff = 0;
-        lastBuffer = alloc.getData();
+        lastBuffer = alloc.getBytes();
       }
       assertEquals(expectedOff, alloc.getOffset());
-      assertTrue("Allocation " + alloc + " overruns buffer",
-          alloc.getOffset() + size <= alloc.getData().length);
+      assertTrue("Allocation overruns buffer",
+          alloc.getOffset() + size <= alloc.getBytes().length);
       expectedOff += size;
     }
   }
 
   @Test
   public void testLABLargeAllocation() {
-    MemStoreLAB mslab = new MemStoreLAB();
-    Allocation alloc = mslab.allocateBytes(2*1024*1024);
+    MemStoreLAB mslab = new HeapMemStoreLAB();
+    ByteRange alloc = mslab.allocateBytes(2*1024*1024);
     assertNull("2MB allocation shouldn't be satisfied by LAB.",
       alloc);
   } 
@@ -88,7 +88,7 @@ public class TestMemStoreLAB {
     
     final AtomicInteger totalAllocated = new AtomicInteger();
     
-    final MemStoreLAB mslab = new MemStoreLAB();
+    final MemStoreLAB mslab = new HeapMemStoreLAB();
     List<List<AllocRecord>> allocations = Lists.newArrayList();
     
     for (int i = 0; i < 10; i++) {
@@ -100,7 +100,7 @@ public class TestMemStoreLAB {
         @Override
         public void doAnAction() throws Exception {
           int size = r.nextInt(1000);
-          Allocation alloc = mslab.allocateBytes(size);
+          ByteRange alloc = mslab.allocateBytes(size);
           totalAllocated.addAndGet(size);
           allocsByThisThread.add(new AllocRecord(alloc, size));
         }
@@ -125,10 +125,10 @@ public class TestMemStoreLAB {
       if (rec.size == 0) continue;
       
       Map<Integer, AllocRecord> mapForThisByteArray =
-        mapsByChunk.get(rec.alloc.getData());
+        mapsByChunk.get(rec.alloc.getBytes());
       if (mapForThisByteArray == null) {
         mapForThisByteArray = Maps.newTreeMap();
-        mapsByChunk.put(rec.alloc.getData(), mapForThisByteArray);
+        mapsByChunk.put(rec.alloc.getBytes(), mapForThisByteArray);
       }
       AllocRecord oldVal = mapForThisByteArray.put(rec.alloc.getOffset(), rec);
       assertNull("Already had an entry " + oldVal + " for allocation " + rec,
@@ -141,8 +141,8 @@ public class TestMemStoreLAB {
       int expectedOff = 0;
       for (AllocRecord alloc : allocsInChunk.values()) {
         assertEquals(expectedOff, alloc.alloc.getOffset());
-        assertTrue("Allocation " + alloc + " overruns buffer",
-            alloc.alloc.getOffset() + alloc.size <= alloc.alloc.getData().length);
+        assertTrue("Allocation overruns buffer",
+            alloc.alloc.getOffset() + alloc.size <= alloc.alloc.getBytes().length);
         expectedOff += alloc.size;
       }
     }
@@ -150,9 +150,9 @@ public class TestMemStoreLAB {
   }
   
   private static class AllocRecord implements Comparable<AllocRecord>{
-    private final Allocation alloc;
+    private final ByteRange alloc;
     private final int size;
-    public AllocRecord(Allocation alloc, int size) {
+    public AllocRecord(ByteRange alloc, int size) {
       super();
       this.alloc = alloc;
       this.size = size;
@@ -160,7 +160,7 @@ public class TestMemStoreLAB {
 
     @Override
     public int compareTo(AllocRecord e) {
-      if (alloc.getData() != e.alloc.getData()) {
+      if (alloc.getBytes() != e.alloc.getBytes()) {
         throw new RuntimeException("Can only compare within a particular array");
       }
       return Ints.compare(alloc.getOffset(), e.alloc.getOffset());
@@ -168,7 +168,7 @@ public class TestMemStoreLAB {
     
     @Override
     public String toString() {
-      return "AllocRecord(alloc=" + alloc + ", size=" + size + ")";
+      return "AllocRecord(offset=" + alloc.getOffset() + ", size=" + size + ")";
     }
     
   }