You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/06/30 20:18:29 UTC

svn commit: r1498167 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/

Author: liyin
Date: Sun Jun 30 18:18:28 2013
New Revision: 1498167

URL: http://svn.apache.org/r1498167
Log:
[HBASE-8837] Initial commit of SLAB allocator for the MemStore.

Author: shaneh

Summary:
Slab allocator for HBase MemStore. Uses direct byte buffers
to manage a slab cache for the MemStore to address fragmentation
issue that causes stop the world garbage collections in the JVM.

Test Plan: Tested on HBASEDEV139 using YCSB to generate load.

Reviewers: liyintang, rshroff, manukranthk, adela

Reviewed By: manukranthk

CC: hbase-eng@, arice

Differential Revision: https://phabricator.fb.com/D835989

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1498167&r1=1498166&r2=1498167&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Sun Jun 30 18:18:28 2013
@@ -790,6 +790,26 @@ public final class HConstants {
   public static final String HBASE_ENABLE_QOS_KEY = "hbase.enable.qos";
   public static final String HBASE_ENABLE_SYNCFILERANGE_THROTTLING_KEY = "hbase.enable.syncfilerange.throttling";
 
+  /*
+   * MSLAB Constants
+   */
+  public final static String MSLAB_CHUNK_POOL_MAX_SIZE_KEY = "hbase.hregion.memstore.chunkpool.maxsize";
+  public final static String MSLAB_CHUNK_POOL_INITIAL_SIZE_KEY = "hbase.hregion.memstore.chunkpool.initialsize";
+  public final static float MSLAB_POOL_MAX_SIZE_DEFAULT = 0.0f;
+  public final static float MSLAB_POOL_INITIAL_SIZE_DEFAULT = 0.0f;
+
+  public final static String MSLAB_CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize";
+  public final static int MSLAB_CHUNK_SIZE_DEFAULT = 2 * 1024 * 1024;
+
+  public final static String MSLAB_MAX_ALLOC_KEY = "hbase.hregion.memstore.mslab.max.allocation";
+  public final static int MSLAB_MAX_ALLOC_DEFAULT = 256  * 1024; // allocs bigger than this don't go through allocator
+
+  public final static String MSLAB_MAX_SIZE_KEY = "hbase.hregion.memstore.mslab.max.size";
+  public final static float MSLAB_MAX_SIZE_DEFAULT = 1.25f; // Stop using SLAB if larger than this percentage of memstore size
+
+  public final static float MSLAB_PCT_LOWER_LIMIT = 0.0f;
+  public final static float MSLAB_PCT_UPPER_LIMIT = 2.0f;
+
   private HConstants() {
     // Can't be instantiated with this constructor.
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1498167&r1=1498166&r2=1498167&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Sun Jun 30 18:18:28 2013
@@ -33,6 +33,8 @@ import java.util.concurrent.locks.Reentr
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
@@ -55,6 +57,12 @@ import org.apache.hadoop.hbase.util.Coll
 public class MemStore implements HeapSize {
   private static final Log LOG = LogFactory.getLog(MemStore.class);
 
+  static final String USE_MSLAB_KEY =
+      "hbase.hregion.memstore.mslab.enabled";
+  private static final boolean USE_MSLAB_DEFAULT = false;
+
+  private Configuration conf;
+
   // MemStore.  Use a KeyValueSkipListSet rather than SkipListSet because of the
   // better semantics.  The Map will overwrite if passed a key it already had
   // whereas the Set will not add new KV if key is same though value might be
@@ -84,18 +92,28 @@ public class MemStore implements HeapSiz
   TimeRangeTracker timeRangeTracker;
   TimeRangeTracker snapshotTimeRangeTracker;
 
+  MemStoreChunkPool chunkPool;
+  volatile MemStoreLAB allocator;
+  volatile MemStoreLAB snapshotAllocator;
+
+  // Keep track of the total size of KVs not just the
+  // bytes stored in the MSLAB
+  private final AtomicLong successfullyAllocatedKvBytes;
+
   /**
    * Default constructor. Used for tests.
    */
   public MemStore() {
-    this(KeyValue.COMPARATOR);
+    this(HBaseConfiguration.create(), KeyValue.COMPARATOR);
   }
 
   /**
    * Constructor.
    * @param c Comparator
    */
-  public MemStore(final KeyValue.KVComparator c) {
+  public MemStore(final Configuration conf,
+      final KeyValue.KVComparator c) {
+    this.conf = conf;
     this.comparator = c;
     this.comparatorIgnoreTimestamp =
       this.comparator.getComparatorIgnoringTimestamps();
@@ -107,6 +125,17 @@ public class MemStore implements HeapSiz
     this.size = new AtomicLong(DEEP_OVERHEAD);
     this.numDeletesInKvSet = new AtomicLong(0);
     this.numDeletesInSnapshot = new AtomicLong(0);
+
+    this.successfullyAllocatedKvBytes = new AtomicLong(0);
+
+    if (conf.getBoolean(USE_MSLAB_KEY, USE_MSLAB_DEFAULT)) {
+      this.chunkPool = MemStoreChunkPool.getPool(conf);
+      this.allocator = new MemStoreLAB(conf, chunkPool);
+    } else {
+      this.allocator = null;
+      this.chunkPool = null;
+    }
+
   }
 
   void dump() {
@@ -137,6 +166,13 @@ public class MemStore implements HeapSiz
           this.kvset = new KeyValueSkipListSet(this.comparator);
           this.snapshotTimeRangeTracker = this.timeRangeTracker;
           this.timeRangeTracker = new TimeRangeTracker();
+
+          this.snapshotAllocator = this.allocator;
+          // Reset allocator so we get a fresh buffer for the new memstore
+          if (allocator != null) {
+            this.allocator = new MemStoreLAB(conf, chunkPool);
+          }
+
           // Reset heap to not include any keys
           this.size.set(DEEP_OVERHEAD);
           this.numDeletesInSnapshot = numDeletesInKvSet;
@@ -168,6 +204,7 @@ public class MemStore implements HeapSiz
    */
   void clearSnapshot(final SortedSet<KeyValue> ss)
   throws UnexpectedException {
+    MemStoreLAB tmpAllocator = null;
     this.lock.writeLock().lock();
     try {
       if (this.snapshot != ss) {
@@ -180,9 +217,16 @@ public class MemStore implements HeapSiz
         this.snapshot = new KeyValueSkipListSet(this.comparator);
         this.snapshotTimeRangeTracker = new TimeRangeTracker();
       }
+      if (this.snapshotAllocator != null) {
+        tmpAllocator = this.snapshotAllocator;
+        this.snapshotAllocator = null;
+      }
     } finally {
       this.lock.writeLock().unlock();
     }
+    if (tmpAllocator != null) {
+      tmpAllocator.close();
+    }
   }
 
   /**
@@ -194,10 +238,11 @@ public class MemStore implements HeapSiz
     long s = -1;
     this.lock.readLock().lock();
     try {
-      s = heapSizeChange(kv, this.kvset.add(kv));
-      timeRangeTracker.includeTimestamp(kv);
+      KeyValue toAdd = maybeCloneWithAllocator(kv);
+      s = heapSizeChange(toAdd, this.kvset.add(toAdd));
+      timeRangeTracker.includeTimestamp(toAdd);
       this.size.addAndGet(s);
-      if (kv.isDelete()) {
+      if (toAdd.isDelete()) {
         this.numDeletesInKvSet.incrementAndGet();
       }
     } finally {
@@ -206,25 +251,33 @@ public class MemStore implements HeapSiz
     return s;
   }
 
+  private KeyValue maybeCloneWithAllocator(KeyValue kv) {
+    if (allocator == null) {
+      return kv;
+    }
+
+    int len = kv.getLength();
+    MemStoreLAB.Allocation alloc = allocator.allocateBytes(len);
+    if (alloc == null) {
+      // The allocator decided not to do anything.
+      return kv;
+    }
+    System.arraycopy(kv.getBuffer(), kv.getOffset(), alloc.getData(), alloc.getOffset(), len);
+    KeyValue newKv = new KeyValue(alloc.getData(), alloc.getOffset(), len);
+    newKv.setMemstoreTS(kv.getMemstoreTS());
+    this.successfullyAllocatedKvBytes.addAndGet(newKv.heapSize());
+    return newKv;
+
+  }
+
+
   /**
    * Write a delete
    * @param delete
    * @return approximate size of the passed key and value.
    */
   long delete(final KeyValue delete) {
-    long s = 0;
-    this.lock.readLock().lock();
-    try {
-      s += heapSizeChange(delete, this.kvset.add(delete));
-      timeRangeTracker.includeTimestamp(delete);
-    } finally {
-      this.lock.readLock().unlock();
-    }
-    if (delete.isDelete()) {
-      this.numDeletesInKvSet.incrementAndGet();
-    }
-    this.size.addAndGet(s);
-    return s;
+    return add(delete);
   }
 
   /**
@@ -524,6 +577,10 @@ public class MemStore implements HeapSiz
     volatile KeyValueSkipListSet kvsetAtCreation;
     volatile KeyValueSkipListSet snapshotAtCreation;
 
+    // The allocator and snapshot allocator at the time of creating this scanner
+    volatile MemStoreLAB allocatorAtCreation;
+    volatile MemStoreLAB snapshotAllocatorAtCreation;
+
     /*
     Some notes...
 
@@ -548,6 +605,15 @@ public class MemStore implements HeapSiz
       kvsetAtCreation = kvset;
       snapshotAtCreation = snapshot;
 
+      if (allocator != null) {
+        this.allocatorAtCreation = allocator;
+        this.allocatorAtCreation.incScannerCount();
+      }
+      if (snapshotAllocator != null) {
+        this.snapshotAllocatorAtCreation = snapshotAllocator;
+        this.snapshotAllocatorAtCreation.incScannerCount();
+      }
+
       //DebugPrint.println(" MS new@" + hashCode());
     }
 
@@ -664,6 +730,15 @@ public class MemStore implements HeapSiz
 
       this.kvsetIt = null;
       this.snapshotIt = null;
+
+      if (allocatorAtCreation != null) {
+        this.allocatorAtCreation.decScannerCount();
+        this.allocatorAtCreation = null;
+      }
+      if (snapshotAllocatorAtCreation != null) {
+        this.snapshotAllocatorAtCreation.decScannerCount();
+        this.snapshotAllocatorAtCreation = null;
+      }
     }
 
     /**
@@ -719,7 +794,7 @@ public class MemStore implements HeapSiz
   }
 
   /**
-   * Get the entire heap usage for this MemStore not including keys in the
+   * Get the entire heap successfullyAllocatedBytes for this MemStore not including keys in the
    * snapshot.
    */
   @Override
@@ -728,7 +803,7 @@ public class MemStore implements HeapSiz
   }
 
   /**
-   * Get the heap usage of KVs in this MemStore.
+   * Get the heap successfullyAllocatedBytes of KVs in this MemStore.
    */
   public long keySize() {
     return heapSize() - DEEP_OVERHEAD;

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java?rev=1498167&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java Sun Jun 30 18:18:28 2013
@@ -0,0 +1,216 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Chunk;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A pool of {@link MemStoreLAB.Chunk } instances.
+ *
+ * MemStoreChunkPool caches a number of retired chunks for reusing, it could
+ * decrease allocating bytes when writing, thereby optimizing the garbage
+ * collection on JVM.
+ *
+ * The pool instance is globally unique and could be obtained through
+ * {@link MemStoreChunkPool#getPool(Configuration)}
+ *
+ * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating
+ * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called
+ * when MemStore clearing snapshot for flush
+ */
+@InterfaceAudience.Private
+public class MemStoreChunkPool {
+  private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class);
+
+  // Static reference to the MemStoreChunkPool
+  private static MemStoreChunkPool globalInstance;
+  /** Boolean whether we have disabled the memstore chunk pool entirely. */
+  static boolean chunkPoolDisabled = false;
+
+  private final int maxCount;
+
+  // A queue of reclaimed chunks
+  private final BlockingQueue<Chunk> reclaimedChunks;
+  private final int chunkSize;
+
+  /** Statistics thread schedule pool */
+  private final ScheduledExecutorService scheduleThreadPool;
+  /** Statistics thread */
+  private static final int statThreadPeriod = 60 * 5;
+  private AtomicLong createdChunkCount = new AtomicLong();
+  private AtomicLong reusedChunkCount = new AtomicLong();
+
+  MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount,
+                    int initialCount) {
+    this.maxCount = maxCount;
+    this.chunkSize = chunkSize;
+    this.reclaimedChunks = new LinkedBlockingQueue<Chunk>();
+    for (int i = 0; i < initialCount; i++) {
+      Chunk chunk = new Chunk(chunkSize);
+      chunk.init();
+      reclaimedChunks.add(chunk);
+    }
+    final String n = Thread.currentThread().getName();
+    scheduleThreadPool = Executors.newScheduledThreadPool(1,
+        new ThreadFactoryBuilder().setNameFormat(n+"-MemStoreChunkPool Statistics")
+            .setDaemon(true).build());
+    this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
+        statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Poll a chunk from the pool, reset it if not null, else create a new chunk
+   * to return
+   * @return a chunk
+   */
+  Chunk getChunk() {
+    Chunk chunk = reclaimedChunks.poll();
+    if (chunk == null) {
+      chunk = new Chunk(chunkSize);
+      createdChunkCount.incrementAndGet();
+    } else {
+      chunk.reset();
+      reusedChunkCount.incrementAndGet();
+    }
+    return chunk;
+  }
+
+  /**
+   * Add the chunks to the pool, when the pool achieves the max size, it will
+   * skip the remaining chunks
+   * @param chunks
+   */
+  void putbackChunks(BlockingQueue<Chunk> chunks) {
+    int maxNumToPutback = this.maxCount - reclaimedChunks.size();
+    if (maxNumToPutback <= 0) {
+      return;
+    }
+    chunks.drainTo(reclaimedChunks, maxNumToPutback);
+  }
+
+  /**
+   * Add the chunk to the pool, if the pool has achieved the max size, it will
+   * skip it
+   * @param chunk
+   */
+  void putbackChunk(Chunk chunk) {
+    if (reclaimedChunks.size() >= this.maxCount) {
+      return;
+    }
+    reclaimedChunks.add(chunk);
+  }
+
+  int getPoolSize() {
+    return this.reclaimedChunks.size();
+  }
+
+  /*
+   * Only used in testing
+   */
+  void clearChunks() {
+    this.reclaimedChunks.clear();
+  }
+
+  private static class StatisticsThread extends Thread {
+    MemStoreChunkPool mcp;
+
+    public StatisticsThread(MemStoreChunkPool mcp) {
+      super("MemStoreChunkPool.StatisticsThread");
+      setDaemon(true);
+      this.mcp = mcp;
+    }
+
+    @Override
+    public void run() {
+      mcp.logStats();
+    }
+  }
+
+  private void logStats() {
+    if (!LOG.isDebugEnabled()) return;
+    long created = createdChunkCount.get();
+    long reused = reusedChunkCount.get();
+    long total = created + reused;
+    LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
+        + ",created chunk count=" + created
+        + ",reused chunk count=" + reused
+        + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent(
+        (float) reused / (float) total, 2)));
+  }
+
+  /**
+   * @param conf
+   * @return the global MemStoreChunkPool instance
+   */
+  static synchronized MemStoreChunkPool getPool(Configuration conf) {
+    if (globalInstance != null) return globalInstance;
+    if (chunkPoolDisabled) return null;
+
+
+    float poolSizePercentage = conf.getFloat(HConstants.MSLAB_CHUNK_POOL_MAX_SIZE_KEY,
+        HConstants.MSLAB_POOL_MAX_SIZE_DEFAULT);
+    if (poolSizePercentage <= 0) {
+      chunkPoolDisabled = true;
+      return null;
+    }
+    if (poolSizePercentage > 1.0) {
+      throw new IllegalArgumentException(HConstants.MSLAB_CHUNK_POOL_MAX_SIZE_KEY
+          + " must be between 0.0 and 1.0");
+    }
+    long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
+        .getMax();
+    long globalMemStoreLimit = MemStoreFlusher.globalMemStoreLimit(heapMax,
+        MemStoreFlusher.DEFAULT_UPPER, MemStoreFlusher.UPPER_KEY, conf);
+    int chunkSize = conf.getInt(HConstants.MSLAB_CHUNK_SIZE_KEY,
+        HConstants.MSLAB_CHUNK_SIZE_DEFAULT);
+    int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize);
+
+    float initialCountPercentage = conf.getFloat(HConstants.MSLAB_CHUNK_POOL_INITIAL_SIZE_KEY,
+        HConstants.MSLAB_POOL_INITIAL_SIZE_DEFAULT);
+    if (initialCountPercentage > 1.0 || initialCountPercentage < 0) {
+      throw new IllegalArgumentException(HConstants.MSLAB_CHUNK_POOL_INITIAL_SIZE_KEY
+          + " must be between 0.0 and 1.0");
+    }
+
+    int initialCount = (int) (initialCountPercentage * maxCount);
+    LOG.info("Allocating MemStoreChunkPool with chunk size "
+        + StringUtils.byteDesc(chunkSize) + ", max count " + maxCount
+        + ", initial count " + initialCount);
+    globalInstance = new MemStoreChunkPool(conf, chunkSize, maxCount,
+        initialCount);
+    return globalInstance;
+  }
+
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1498167&r1=1498166&r2=1498167&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Sun Jun 30 18:18:28 2013
@@ -68,9 +68,9 @@ class MemStoreFlusher implements FlushRe
   protected final long globalMemStoreLimit;
   protected final long globalMemStoreLimitLowMark;
 
-  private static final float DEFAULT_UPPER = 0.4f;
+  protected static final float DEFAULT_UPPER = 0.4f;
   private static final float DEFAULT_LOWER = 0.25f;
-  private static final String UPPER_KEY =
+  protected static final String UPPER_KEY =
     "hbase.regionserver.global.memstore.upperLimit";
   private static final String LOWER_KEY =
     "hbase.regionserver.global.memstore.lowerLimit";

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java?rev=1498167&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java Sun Jun 30 18:18:28 2013
@@ -0,0 +1,389 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+
+/**
+ * A memstore-local allocation buffer.
+ * <p>
+ * The MemStoreLAB is basically a bump-the-pointer allocator that allocates
+ * big (2MB) byte[] chunks from and then doles it out to threads that request
+ * slices into the array.
+ * <p>
+ * The purpose of this class is to combat heap fragmentation in the
+ * regionserver. By ensuring that all KeyValues in a given memstore refer
+ * only to large chunks of contiguous memory, we ensure that large blocks
+ * get freed up when the memstore is flushed.
+ * <p>
+ * Without the MSLAB, the byte array allocated during insertion end up
+ * interleaved throughout the heap, and the old generation gets progressively
+ * more fragmented until a stop-the-world compacting collection occurs.
+ * <p>
+ * TODO: we should probably benchmark whether word-aligning the allocations
+ * would provide a performance improvement - probably would speed up the
+ * Bytes.toLong/Bytes.toInt calls in KeyValue, but some of those are cached
+ * anyway
+ */
+@InterfaceAudience.Private
+public class MemStoreLAB {
+  private static final Log LOG = LogFactory.getLog(MemStoreLAB.class);
+  private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
+  // A queue of chunks contained by this memstore
+  private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>();
+
+  final int chunkSize;
+  final int maxAlloc;
+  final long slabLimit;
+
+  private final MemStoreChunkPool chunkPool;
+
+  // This flag is for closing this instance, its set when clearing snapshot of
+  // memstore
+  private volatile boolean closed = false;
+  // This flag is for reclaiming chunks. Its set when putting chunks back to
+  // pool
+  private AtomicBoolean reclaimed = new AtomicBoolean(false);
+  // Current count of open scanners which reading data from this MemStoreLAB
+  private final AtomicInteger openScannerCount = new AtomicInteger();
+
+  // Keep track of how many bytes in total have been set aside by the MemStoreLAB
+  private final AtomicLong totalSLABAllocatedBytes = new AtomicLong(0l);
+
+  // Keep track of how many successful allocations
+  private final AtomicLong numSuccessfulAllocations = new AtomicLong(0);
+  // Keep track of how many bytes have been used
+  private final AtomicLong successfullyAllocatedBytes = new AtomicLong(0);
+  // Keep track of the number of failed allocations
+  private final AtomicLong numDeniedAllocations = new AtomicLong(0);
+  // Keep track of the number of failed allocation bytes
+  private final AtomicLong deniedAllocationBytes = new AtomicLong(0);
+
+  // Used in testing
+  public MemStoreLAB() {
+    this(new Configuration());
+  }
+
+  private MemStoreLAB(Configuration conf) {
+    this(conf, MemStoreChunkPool.getPool(conf));
+  }
+
+  public MemStoreLAB(Configuration conf, MemStoreChunkPool pool) {
+    chunkSize = conf.getInt(HConstants.MSLAB_CHUNK_SIZE_KEY, HConstants.MSLAB_CHUNK_SIZE_DEFAULT);
+    maxAlloc = conf.getInt(HConstants.MSLAB_MAX_ALLOC_KEY, HConstants.MSLAB_MAX_ALLOC_DEFAULT);
+    float slabLimitPct = conf.getFloat(HConstants.MSLAB_MAX_SIZE_KEY, HConstants.MSLAB_MAX_SIZE_DEFAULT);
+    if (slabLimitPct < HConstants.MSLAB_PCT_LOWER_LIMIT || slabLimitPct > HConstants.MSLAB_PCT_UPPER_LIMIT) {
+      LOG.warn(HConstants.MSLAB_MAX_SIZE_KEY + " must be in the range " + HConstants.MSLAB_PCT_LOWER_LIMIT +
+          " to " + HConstants.MSLAB_PCT_UPPER_LIMIT + " inclusive.");
+      slabLimitPct = HConstants.MSLAB_MAX_SIZE_DEFAULT;
+    }
+
+    this.chunkPool = pool;
+
+    slabLimit = (long) (slabLimitPct * conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
+        HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE));
+
+    // if we don't exclude allocations >CHUNK_SIZE, we'd infinite loop on one!
+    Preconditions.checkArgument(
+        maxAlloc <= chunkSize,
+        HConstants.MSLAB_MAX_ALLOC_KEY + " must be less than " + HConstants.MSLAB_CHUNK_SIZE_KEY);
+  }
+
+  /**
+   * Allocate a slice of the given length.
+   *
+   * If the size is larger than the maximum size specified for this
+   * allocator, returns null.
+   */
+  protected Allocation allocateBytes(int size) {
+    Preconditions.checkArgument(size >= 0, "negative size");
+
+    // Callers should satisfy large allocations directly from JVM since they
+    // don't cause fragmentation as badly.
+    if (size > maxAlloc) {
+      this.numDeniedAllocations.incrementAndGet();
+      this.deniedAllocationBytes.addAndGet(size);
+      return null;
+    }
+
+    // Check if the SLAB allocator is allocating too much memory
+    if (this.totalSLABAllocatedBytes.get() >= slabLimit) {
+      LOG.warn("MSLAB has reached the maximum size of " + slabLimit + " and will no longer be used for allocations.");
+      return null;
+    }
+
+    while (true) {
+      Chunk c = getOrMakeChunk();
+
+      // Try to allocate from this chunk
+      int allocOffset = c.alloc(size);
+      if (allocOffset != -1) {
+        // We succeeded - this is the common case - small alloc
+        // from a big buffer
+        this.numSuccessfulAllocations.incrementAndGet();
+        this.successfullyAllocatedBytes.addAndGet(size);
+        return new Allocation(c.data, allocOffset);
+      }
+
+      // not enough space!
+      // try to retire this chunk
+      tryRetireChunk(c);
+    }
+  }
+
+  /**
+   * Close this instance since it won't be used any more, try to put the chunks
+   * back to pool
+   */
+  void close() {
+    this.closed = true;
+
+    // Get the stats of the final chunk
+    Chunk c = curChunk.get();
+
+    // We could put back the chunks to pool for reusing only when there is no
+    // opening scanner which will read their data
+    if (chunkPool != null && openScannerCount.get() == 0
+        && reclaimed.compareAndSet(false, true)) {
+      chunkPool.putbackChunks(this.chunkQueue);
+    }
+  }
+
+  /**
+   * Called when opening a scanner on the data of this MemStoreLAB
+   */
+  void incScannerCount() {
+    this.openScannerCount.incrementAndGet();
+  }
+
+  /**
+   * Called when closing a scanner on the data of this MemStoreLAB
+   */
+  void decScannerCount() {
+    int count = this.openScannerCount.decrementAndGet();
+
+    if (chunkPool != null && count == 0 && this.closed
+        && reclaimed.compareAndSet(false, true)) {
+      chunkPool.putbackChunks(this.chunkQueue);
+    }
+
+    assert count >= 0;
+  }
+
+  /**
+   * Try to retire the current chunk if it is still
+   * <code>c</code>. Postcondition is that curChunk.get()
+   * != c
+   */
+  private void tryRetireChunk(Chunk c) {
+    if(curChunk.compareAndSet(c, null)) {
+      // We retired the chunk so get the stats
+      //this.usage.addAndGet(c.nextFreeOffset.get());
+      //this.totalSLABAllocatedBytes.addAndGet(c.size);
+      //this.totalAllocations.addAndGet(c.allocCount.get());
+    }
+    // If the CAS succeeds, that means that we won the race
+    // to retire the chunk. We could use this opportunity to
+    // update metrics on external fragmentation.
+    //
+    // If the CAS fails, that means that someone else already
+    // retired the chunk for us.
+  }
+
+  /**
+   * Get the current chunk, or, if there is no current chunk,
+   * allocate a new one from the JVM.
+   */
+  private Chunk getOrMakeChunk() {
+    while (true) {
+      // Try to get the chunk
+      Chunk c = curChunk.get();
+      if (c != null) {
+        return c;
+      }
+
+      // No current chunk, so we want to allocate one. We race
+      // against other allocators to CAS in an uninitialized chunk
+      // (which is cheap to allocate)
+      c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize);
+      if (curChunk.compareAndSet(null, c)) {
+        this.totalSLABAllocatedBytes.addAndGet(c.size);
+        // we won race - now we need to actually do the expensive
+        // allocation step
+        c.init();
+        this.chunkQueue.add(c);
+        return c;
+      } else if (chunkPool != null) {
+        chunkPool.putbackChunk(c);
+      }
+      // someone else won race - that's fine, we'll try to grab theirs
+      // in the next iteration of the loop.
+    }
+  }
+
+  /**
+   * A chunk of memory out of which allocations are sliced.
+   */
+  static class Chunk {
+    /** Actual underlying data */
+    private byte[] data;
+
+    private static final int UNINITIALIZED = -1;
+    private static final int OOM = -2;
+    /**
+     * Offset for the next allocation, or the sentinel value -1
+     * which implies that the chunk is still uninitialized.
+     * */
+    private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
+
+    /** Total number of allocations satisfied from this buffer */
+    private AtomicInteger allocCount = new AtomicInteger();
+
+    /** Size of chunk in bytes */
+    private final int size;
+
+    /**
+     * Create an uninitialized chunk. Note that memory is not allocated yet, so
+     * this is cheap.
+     * @param size in bytes
+     */
+    Chunk(int size) {
+      this.size = size;
+    }
+
+    /**
+     * Actually claim the memory for this chunk. This should only be called from
+     * the thread that constructed the chunk. It is thread-safe against other
+     * threads calling alloc(), who will block until the allocation is complete.
+     */
+    protected void init() {
+      assert nextFreeOffset.get() == UNINITIALIZED;
+      try {
+        if (data == null) {
+          data = new byte[size];
+        }
+      } catch (OutOfMemoryError e) {
+        boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
+        assert failInit; // should be true.
+        throw e;
+      }
+      // Mark that it's ready for use
+      boolean initialized = nextFreeOffset.compareAndSet(
+          UNINITIALIZED, 0);
+      // We should always succeed the above CAS since only one thread
+      // calls init()!
+      Preconditions.checkState(initialized,
+          "Multiple threads tried to init same chunk");
+    }
+
+    /**
+     * Reset the offset to UNINITIALIZED before before reusing an old chunk
+     */
+    void reset() {
+      if (nextFreeOffset.get() != UNINITIALIZED) {
+        nextFreeOffset.set(UNINITIALIZED);
+        allocCount.set(0);
+      }
+    }
+
+    /**
+     * Try to allocate <code>size</code> bytes from the chunk.
+     * @return the offset of the successful allocation, or -1 to indicate not-enough-space
+     */
+    public int alloc(int size) {
+      while (true) {
+        int oldOffset = nextFreeOffset.get();
+        if (oldOffset == UNINITIALIZED) {
+          // The chunk doesn't have its data allocated yet.
+          // Since we found this in curChunk, we know that whoever
+          // CAS-ed it there is allocating it right now. So spin-loop
+          // shouldn't spin long!
+          Thread.yield();
+          continue;
+        }
+        if (oldOffset == OOM) {
+          // doh we ran out of ram. return -1 to chuck this away.
+          return -1;
+        }
+
+        if (oldOffset + size > data.length) {
+          return -1; // alloc doesn't fit
+        }
+
+        // Try to atomically claim this chunk
+        if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
+          // we got the alloc
+          allocCount.incrementAndGet();
+          return oldOffset;
+        }
+        // we raced and lost alloc, try again
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "Chunk@" + System.identityHashCode(this) +
+          " allocs=" + allocCount.get() + "waste=" +
+          (data.length - nextFreeOffset.get());
+    }
+  }
+
+  /**
+   * The result of a single allocation. Contains the chunk that the
+   * allocation points into, and the offset in this array where the
+   * slice begins.
+   */
+  public static class Allocation {
+    private final byte[] data;
+    private final int offset;
+
+    private Allocation(byte[] data, int off) {
+      this.data = data;
+      this.offset = off;
+    }
+
+    @Override
+    public String toString() {
+      return "Allocation(" + "totalSLABAllocatedBytes=" + data.length + ", off=" + offset
+          + ")";
+    }
+
+    byte[] getData() {
+      return data;
+    }
+
+    int getOffset() {
+      return offset;
+    }
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1498167&r1=1498166&r2=1498167&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Sun Jun 30 18:18:28 2013
@@ -188,7 +188,7 @@ public class Store extends SchemaConfigu
    * @param region
    * @param family HColumnDescriptor for this column
    * @param fs file system object
-   * @param conf configuration object
+   * @param confParam configuration object
    * failed.  Can be null.
    * @throws IOException
    */
@@ -250,7 +250,7 @@ public class Store extends SchemaConfigu
     LOG.info("time to purge deletes set to " + timeToPurgeDeletes +
         "ms in store " + this);
 
-    this.memstore = new MemStore(this.comparator);
+    this.memstore = new MemStore(conf, this.comparator);
     this.storeNameStr = getColumnFamilyName();
 
     // Setting up cache configuration for this family
@@ -698,7 +698,9 @@ public class Store extends SchemaConfigu
 
   /**
    * Snapshot this stores memstore.  Call before running
-   * {@link #flushCache(long, SortedSet<KeyValue>)} so it has some work to do.
+   * {@link #flushCache(long, java.util.SortedSet, TimeRangeTracker,
+   *                    org.apache.hadoop.hbase.monitoring.MonitoredTask)}
+   * so it has some work to do.
    */
   void snapshot() {
     this.memstore.snapshot();
@@ -999,18 +1001,18 @@ public class Store extends SchemaConfigu
    * Existing StoreFiles are not destroyed until the new compacted StoreFile is
    * completely written-out to disk.
    *
-   * @param CompactionRequest
+   * @param compactionRequest
    *          compaction details obtained from requestCompaction()
    * @throws IOException
    */
-  void compact(CompactionRequest cr) throws IOException {
-    if (cr == null || cr.getFiles().isEmpty()) {
+  void compact(CompactionRequest compactionRequest) throws IOException {
+    if (compactionRequest == null || compactionRequest.getFiles().isEmpty()) {
       return;
     }
-    Preconditions.checkArgument(cr.getStore().toString()
+    Preconditions.checkArgument(compactionRequest.getStore().toString()
         .equals(this.toString()));
 
-    List<StoreFile> filesToCompact = cr.getFiles();
+    List<StoreFile> filesToCompact = compactionRequest.getFiles();
 
     synchronized (filesCompacting) {
       // sanity check: we're compacting files that this store knows about
@@ -1023,31 +1025,32 @@ public class Store extends SchemaConfigu
 
     // Ready to go. Have list of files to compact.
     MonitoredTask status = TaskMonitor.get().createStatus(
-        (cr.isMajor() ? "Major " : "") + "Compaction (ID: " + cr.getCompactSelectionID() + ") of "
+        (compactionRequest.isMajor() ? "Major " : "")
+        + "Compaction (ID: " + compactionRequest.getCompactSelectionID() + ") of "
         + this.storeNameStr + " on "
         + this.region.getRegionInfo().getRegionNameAsString());
-    LOG.info("Starting compaction (ID: " + cr.getCompactSelectionID() + ") of "
+    LOG.info("Starting compaction (ID: " + compactionRequest.getCompactSelectionID() + ") of "
         + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
         + this.region.getRegionInfo().getRegionNameAsString()
         + " into " + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
-        + StringUtils.humanReadableInt(cr.getSize()));
+        + StringUtils.humanReadableInt(compactionRequest.getSize()));
 
     StoreFile sf = null;
     try {
       status.setStatus("Compacting " + filesToCompact.size() + " file(s)");
       long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
-      StoreFile.Writer writer = compactStores(filesToCompact, cr.isMajor(), maxId);
+      StoreFile.Writer writer = compactStores(filesToCompact, compactionRequest.isMajor(), maxId);
       // Move the compaction into place.
       sf = completeCompaction(filesToCompact, writer);
 
       // Report that the compaction is complete.
       status.markComplete("Completed compaction");
-      LOG.info("Completed" + (cr.isMajor() ? " major " : " ")
-          + "compaction (ID: " + cr.getCompactSelectionID() + ") of "
+      LOG.info("Completed" + (compactionRequest.isMajor() ? " major " : " ")
+          + "compaction (ID: " + compactionRequest.getCompactSelectionID() + ") of "
           + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
           + this.region.getRegionInfo().getRegionNameAsString()
           + "; This selection was in queue for "
-          + StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime()) + ", and took "
+          + StringUtils.formatTimeDiff(compactionStartTime, compactionRequest.getSelectionTime()) + ", and took "
           + StringUtils.formatTimeDiff(EnvironmentEdgeManager.currentTimeMillis(),
                                        compactionStartTime)
           + " to execute. New storefile name="

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=1498167&r1=1498166&r2=1498167&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java Sun Jun 30 18:18:28 2013
@@ -20,6 +20,8 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
 import java.rmi.UnexpectedException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -32,6 +34,7 @@ import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
@@ -475,7 +478,7 @@ public class TestMemStore extends TestCa
   }
 
   public void testMultipleVersionsSimple() throws Exception {
-    MemStore m = new MemStore(KeyValue.COMPARATOR);
+    MemStore m = new MemStore(HBaseConfiguration.create() , KeyValue.COMPARATOR);
     byte [] row = Bytes.toBytes("testRow");
     byte [] family = Bytes.toBytes("testFamily");
     byte [] qf = Bytes.toBytes("testQualifier");
@@ -495,7 +498,7 @@ public class TestMemStore extends TestCa
   }
 
   public void testBinary() throws IOException {
-    MemStore mc = new MemStore(KeyValue.ROOT_COMPARATOR);
+    MemStore mc = new MemStore(HBaseConfiguration.create(), KeyValue.ROOT_COMPARATOR);
     final int start = 43;
     final int end = 46;
     for (int k = start; k <= end; k++) {
@@ -763,7 +766,6 @@ public class TestMemStore extends TestCa
     assertEquals(delete, memstore.kvset.first());
   }
 
-
   ////////////////////////////////////
   //Test for timestamps
   ////////////////////////////////////
@@ -796,6 +798,52 @@ public class TestMemStore extends TestCa
     //assertTrue(!memstore.shouldSeek(scan));
   }
 
+  ////////////////////////////////////
+  //Test for OOM with MSLAB
+  ////////////////////////////////////
+
+  /**
+   * Test a pathological pattern that shows why we can't currently
+   * use the MSLAB for upsert workloads. This test inserts data
+   * in the following pattern:
+   *
+   * - row0001 through row1000 (fills up one 2M Chunk)
+   * - row0002 through row1001 (fills up another 2M chunk, leaves one reference
+   *   to the first chunk
+   * - row0003 through row1002 (another chunk, another dangling reference)
+   *
+   * This causes OOME pretty quickly if we use MSLAB for upsert
+   * since each 2M chunk is held onto by a single reference.
+   */
+  public void testUpsertMSLAB() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean(MemStore.USE_MSLAB_KEY, true);
+    memstore = new MemStore(conf, KeyValue.COMPARATOR);
+
+    int ROW_SIZE = 2048;
+    byte[] qualifier = new byte[ROW_SIZE - 4];
+
+    MemoryMXBean bean = ManagementFactory.getMemoryMXBean();
+    for (int i = 0; i < 3; i++) { System.gc(); }
+    long usageBefore = bean.getHeapMemoryUsage().getUsed();
+
+    long size = 0;
+    long ts=0;
+
+    for (int newValue = 0; newValue < 1000; newValue++) {
+      for (int row = newValue; row < newValue + 1000; row++) {
+        byte[] rowBytes = Bytes.toBytes(row);
+        size += memstore.updateColumnValue(rowBytes, FAMILY, qualifier, newValue, ++ts);
+      }
+    }
+    System.out.println("Wrote " + ts + " vals");
+    for (int i = 0; i < 3; i++) { System.gc(); }
+    long usageAfter = bean.getHeapMemoryUsage().getUsed();
+    System.out.println("Memory used: " + (usageAfter - usageBefore)
+        + " (heapsize: " + memstore.heapSize() +
+        " size: " + size + ")");
+  }
+
 
   //////////////////////////////////////////////////////////////////////////////
   // Helpers

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java?rev=1498167&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java Sun Jun 30 18:18:28 2013
@@ -0,0 +1,178 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.*;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MultithreadedTestUtil;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation;
+import org.junit.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.MapMaker;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
+
+public class TestMemStoreLAB {
+
+  /**
+   * Test a bunch of random allocations
+   */
+  @Test
+  public void testLABRandomAllocation() {
+    Random rand = new Random();
+    MemStoreLAB mslab = new MemStoreLAB();
+    int expectedOff = 0;
+    byte[] lastBuffer = null;
+    // 100K iterations by 0-1K alloc -> 50MB expected
+    // should be reasonable for unit test and also cover wraparound
+    // behavior
+    for (int i = 0; i < 100000; i++) {
+      int size = rand.nextInt(1000);
+      Allocation alloc = mslab.allocateBytes(size);
+
+      if (alloc.getData() != lastBuffer) {
+        expectedOff = 0;
+        lastBuffer = alloc.getData();
+      }
+      assertEquals(expectedOff, alloc.getOffset());
+      assertTrue("Allocation " + alloc + " overruns buffer",
+          alloc.getOffset() + size <= alloc.getData().length);
+      expectedOff += size;
+    }
+  }
+
+  @Test
+  public void testLABLargeAllocation() {
+    MemStoreLAB mslab = new MemStoreLAB();
+    Allocation alloc = mslab.allocateBytes(HConstants.MSLAB_MAX_ALLOC_DEFAULT + 1);
+    assertNull("Allocation larger than the max allocation allowed shouldn't be satisfied by LAB.",
+      alloc);
+  }
+
+  /**
+   * Test allocation from lots of threads, making sure the results don't
+   * overlap in any way
+   */
+  @Test
+  public void testLABThreading() throws Exception {
+    Configuration conf = new Configuration();
+    MultithreadedTestUtil.TestContext ctx =
+      new MultithreadedTestUtil.TestContext(conf);
+
+    final AtomicInteger totalAllocated = new AtomicInteger();
+
+    final MemStoreLAB mslab = new MemStoreLAB();
+    List<List<AllocRecord>> allocations = Lists.newArrayList();
+
+    for (int i = 0; i < 10; i++) {
+      final List<AllocRecord> allocsByThisThread = Lists.newLinkedList();
+      allocations.add(allocsByThisThread);
+
+      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
+        private Random r = new Random();
+        @Override
+        public void doAnAction() throws Exception {
+          int size = r.nextInt(1000);
+          Allocation alloc = mslab.allocateBytes(size);
+          totalAllocated.addAndGet(size);
+          allocsByThisThread.add(new AllocRecord(alloc, size));
+        }
+      };
+      ctx.addThread(t);
+    }
+
+    ctx.startThreads();
+    while (totalAllocated.get() < 50*1024*1024 && ctx.shouldRun()) {
+      Thread.sleep(10);
+    }
+    ctx.stop();
+
+    // Partition the allocations by the actual byte[] they point into,
+    // make sure offsets are unique for each chunk
+    Map<byte[], Map<Integer, AllocRecord>> mapsByChunk =
+      Maps.newHashMap();
+
+    int sizeCounted = 0;
+    for (AllocRecord rec : Iterables.concat(allocations)) {
+      sizeCounted += rec.size;
+      if (rec.size == 0) continue;
+
+      Map<Integer, AllocRecord> mapForThisByteArray =
+        mapsByChunk.get(rec.alloc.getData());
+      if (mapForThisByteArray == null) {
+        mapForThisByteArray = Maps.newTreeMap();
+        mapsByChunk.put(rec.alloc.getData(), mapForThisByteArray);
+      }
+      AllocRecord oldVal = mapForThisByteArray.put(rec.alloc.getOffset(), rec);
+      assertNull("Already had an entry " + oldVal + " for allocation " + rec,
+          oldVal);
+    }
+    assertEquals("Sanity check test", sizeCounted, totalAllocated.get());
+
+    // Now check each byte array to make sure allocations don't overlap
+    for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) {
+      int expectedOff = 0;
+      for (AllocRecord alloc : allocsInChunk.values()) {
+        assertEquals(expectedOff, alloc.alloc.getOffset());
+        assertTrue("Allocation " + alloc + " overruns buffer",
+            alloc.alloc.getOffset() + alloc.size <= alloc.alloc.getData().length);
+        expectedOff += alloc.size;
+      }
+    }
+
+  }
+
+  private static class AllocRecord implements Comparable<AllocRecord>{
+    private final Allocation alloc;
+    private final int size;
+    public AllocRecord(Allocation alloc, int size) {
+      super();
+      this.alloc = alloc;
+      this.size = size;
+    }
+
+    @Override
+    public int compareTo(AllocRecord e) {
+      if (alloc.getData() != e.alloc.getData()) {
+        throw new RuntimeException("Can only compare within a particular array");
+      }
+      return Ints.compare(alloc.getOffset(), e.alloc.getOffset());
+    }
+
+    @Override
+    public String toString() {
+      return "AllocRecord(alloc=" + alloc + ", size=" + size + ")";
+    }
+
+  }
+}