You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zj...@apache.org on 2013/01/14 05:08:18 UTC

svn commit: r1432797 [1/2] - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/util/ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/ hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ hbase-server/src/tes...

Author: zjushch
Date: Mon Jan 14 04:08:17 2013
New Revision: 1432797

URL: http://svn.apache.org/viewvc?rev=1432797&view=rev
Log:
HBASE-7404 Bucket Cache:A solution about CMS,Heap Fragment and Big Cache on HBASE (Chunhui)

Added:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCacheStats.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CacheFullException.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java

Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java?rev=1432797&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java Mon Jan 14 04:08:17 2013
@@ -0,0 +1,195 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This class manages an array of ByteBuffers with a default size 4MB. These
+ * buffers are sequential and could be considered as a large buffer.It supports
+ * reading/writing data from this large buffer with a position and offset
+ */
+@InterfaceAudience.Public
+public final class ByteBufferArray {
+  static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
+
+  static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
+  private ByteBuffer buffers[];
+  private Lock locks[];
+  private int bufferSize;
+  private int bufferCount;
+
+  /**
+   * We allocate a number of byte buffers as the capacity. In order not to out
+   * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}), 
+   * we will allocate one additional buffer with capacity 0;
+   * @param capacity total size of the byte buffer array
+   * @param directByteBuffer true if we allocate direct buffer
+   */
+  public ByteBufferArray(long capacity, boolean directByteBuffer) {
+    this.bufferSize = DEFAULT_BUFFER_SIZE;
+    if (this.bufferSize > (capacity / 16))
+      this.bufferSize = (int) roundUp(capacity / 16, 32768);
+    this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
+    LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
+        + " , sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
+        + bufferCount);
+    buffers = new ByteBuffer[bufferCount + 1];
+    locks = new Lock[bufferCount + 1];
+    for (int i = 0; i <= bufferCount; i++) {
+      locks[i] = new ReentrantLock();
+      if (i < bufferCount) {
+        buffers[i] = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize)
+            : ByteBuffer.allocate(bufferSize);
+      } else {
+        buffers[i] = ByteBuffer.allocate(0);
+      }
+
+    }
+  }
+
+  private long roundUp(long n, long to) {
+    return ((n + to - 1) / to) * to;
+  }
+
+  /**
+   * Transfers bytes from this buffer array into the given destination array
+   * @param start start position in the ByteBufferArray
+   * @param len The maximum number of bytes to be written to the given array
+   * @param dstArray The array into which bytes are to be written
+   */
+  public void getMultiple(long start, int len, byte[] dstArray) {
+    getMultiple(start, len, dstArray, 0);
+  }
+
+  /**
+   * Transfers bytes from this buffer array into the given destination array
+   * @param start start offset of this buffer array
+   * @param len The maximum number of bytes to be written to the given array
+   * @param dstArray The array into which bytes are to be written
+   * @param dstOffset The offset within the given array of the first byte to be
+   *          written
+   */
+  public void getMultiple(long start, int len, byte[] dstArray, int dstOffset) {
+    multiple(start, len, dstArray, dstOffset, new Visitor() {
+      public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
+        bb.get(array, arrayIdx, len);
+      }
+    });
+  }
+
+  /**
+   * Transfers bytes from the given source array into this buffer array
+   * @param start start offset of this buffer array
+   * @param len The maximum number of bytes to be read from the given array
+   * @param srcArray The array from which bytes are to be read
+   */
+  public void putMultiple(long start, int len, byte[] srcArray) {
+    putMultiple(start, len, srcArray, 0);
+  }
+
+  /**
+   * Transfers bytes from the given source array into this buffer array
+   * @param start start offset of this buffer array
+   * @param len The maximum number of bytes to be read from the given array
+   * @param srcArray The array from which bytes are to be read
+   * @param srcOffset The offset within the given array of the first byte to be
+   *          read
+   */
+  public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) {
+    multiple(start, len, srcArray, srcOffset, new Visitor() {
+      public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
+        bb.put(array, arrayIdx, len);
+      }
+    });
+  }
+
+  private interface Visitor {
+    /**
+     * Visit the given byte buffer, if it is a read action, we will transfer the
+     * bytes from the buffer to the destination array, else if it is a write
+     * action, we will transfer the bytes from the source array to the buffer
+     * @param bb byte buffer
+     * @param array a source or destination byte array
+     * @param arrayOffset offset of the byte array
+     * @param len read/write length
+     */
+    void visit(ByteBuffer bb, byte[] array, int arrayOffset, int len);
+  }
+
+  /**
+   * Access(read or write) this buffer array with a position and length as the
+   * given array. Here we will only lock one buffer even if it may be need visit
+   * several buffers. The consistency is guaranteed by the caller.
+   * @param start start offset of this buffer array
+   * @param len The maximum number of bytes to be accessed
+   * @param array The array from/to which bytes are to be read/written
+   * @param arrayOffset The offset within the given array of the first byte to
+   *          be read or written
+   * @param visitor implement of how to visit the byte buffer
+   */
+  void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) {
+    assert len >= 0;
+    long end = start + len;
+    int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize);
+    int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize);
+    assert array.length >= len + arrayOffset;
+    assert startBuffer >= 0 && startBuffer < bufferCount;
+    assert endBuffer >= 0 && endBuffer < bufferCount
+        || (endBuffer == bufferCount && endOffset == 0);
+    if (startBuffer >= locks.length || startBuffer < 0) {
+      String msg = "Failed multiple, start=" + start + ",startBuffer="
+          + startBuffer + ",bufferSize=" + bufferSize;
+      LOG.error(msg);
+      throw new RuntimeException(msg);
+    }
+    int srcIndex = 0, cnt = -1;
+    for (int i = startBuffer; i <= endBuffer; ++i) {
+      Lock lock = locks[i];
+      lock.lock();
+      try {
+        ByteBuffer bb = buffers[i];
+        if (i == startBuffer) {
+          cnt = bufferSize - startOffset;
+          if (cnt > len) cnt = len;
+          bb.limit(startOffset + cnt).position(
+              startOffset );
+        } else if (i == endBuffer) {
+          cnt = endOffset;
+          bb.limit(cnt).position(0);
+        } else {
+          cnt = bufferSize ;
+          bb.limit(cnt).position(0);
+        }
+        visitor.visit(bb, array, srcIndex + arrayOffset, cnt);
+        srcIndex += cnt;
+      } finally {
+        lock.unlock();
+      }
+    }
+    assert srcIndex == len;
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java?rev=1432797&r1=1432796&r2=1432797&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java Mon Jan 14 04:08:17 2013
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.Clas
  * Cache Key for use with implementations of {@link BlockCache}
  */
 @InterfaceAudience.Private
-public class BlockCacheKey implements HeapSize {
+public class BlockCacheKey implements HeapSize, java.io.Serializable {
   private final String hfileName;
   private final long offset;
   private final DataBlockEncoding encoding;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java?rev=1432797&r1=1432796&r2=1432797&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java Mon Jan 14 04:08:17 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryUsage;
 
@@ -27,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.DirectMemoryUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -72,6 +74,28 @@ public class CacheConfig {
   public static final String EVICT_BLOCKS_ON_CLOSE_KEY =
       "hbase.rs.evictblocksonclose";
 
+  /**
+   * Configuration keys for Bucket cache
+   */
+  public static final String BUCKET_CACHE_IOENGINE_KEY = "hbase.bucketcache.ioengine";
+  public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size";
+  public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = 
+      "hbase.bucketcache.persistent.path";
+  public static final String BUCKET_CACHE_COMBINED_KEY = 
+      "hbase.bucketcache.combinedcache.enabled";
+  public static final String BUCKET_CACHE_COMBINED_PERCENTAGE_KEY = 
+      "hbase.bucketcache.percentage.in.combinedcache";
+  public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads";
+  public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = 
+      "hbase.bucketcache.writer.queuelength";
+  /**
+   * Defaults for Bucket cache
+   */
+  public static final boolean DEFAULT_BUCKET_CACHE_COMBINED = true;
+  public static final int DEFAULT_BUCKET_CACHE_WRITER_THREADS = 3;
+  public static final int DEFAULT_BUCKET_CACHE_WRITER_QUEUE = 64;
+  public static final float DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE = 0.9f;
+
   // Defaults
 
   public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
@@ -341,19 +365,60 @@ public class CacheConfig {
 
     // Calculate the amount of heap to give the heap.
     MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
-    long cacheSize = (long)(mu.getMax() * cachePercentage);
+    long lruCacheSize = (long) (mu.getMax() * cachePercentage);
     int blockSize = conf.getInt("hbase.offheapcache.minblocksize",
         HFile.DEFAULT_BLOCKSIZE);
     long offHeapCacheSize =
       (long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0) *
           DirectMemoryUtils.getDirectMemorySize());
-    LOG.info("Allocating LruBlockCache with maximum size " +
-      StringUtils.humanReadableInt(cacheSize));
     if (offHeapCacheSize <= 0) {
-      globalBlockCache = new LruBlockCache(cacheSize,
-          StoreFile.DEFAULT_BLOCKSIZE_SMALL, conf);
+      String bucketCacheIOEngineName = conf
+          .get(BUCKET_CACHE_IOENGINE_KEY, null);
+      float bucketCachePercentage = conf.getFloat(BUCKET_CACHE_SIZE_KEY, 0F);
+      // A percentage of max heap size or a absolute value with unit megabytes
+      long bucketCacheSize = (long) (bucketCachePercentage < 1 ? mu.getMax()
+          * bucketCachePercentage : bucketCachePercentage * 1024 * 1024);
+
+      boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY,
+          DEFAULT_BUCKET_CACHE_COMBINED);
+      BucketCache bucketCache = null;
+      if (bucketCacheIOEngineName != null && bucketCacheSize > 0) {
+        int writerThreads = conf.getInt(BUCKET_CACHE_WRITER_THREADS_KEY,
+            DEFAULT_BUCKET_CACHE_WRITER_THREADS);
+        int writerQueueLen = conf.getInt(BUCKET_CACHE_WRITER_QUEUE_KEY,
+            DEFAULT_BUCKET_CACHE_WRITER_QUEUE);
+        String persistentPath = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY);
+        float combinedPercentage = conf.getFloat(
+            BUCKET_CACHE_COMBINED_PERCENTAGE_KEY,
+            DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE);
+        if (combinedWithLru) {
+          lruCacheSize = (long) ((1 - combinedPercentage) * bucketCacheSize);
+          bucketCacheSize = (long) (combinedPercentage * bucketCacheSize);
+        }
+        try {
+          int ioErrorsTolerationDuration = conf.getInt(
+              "hbase.bucketcache.ioengine.errors.tolerated.duration",
+              BucketCache.DEFAULT_ERROR_TOLERATION_DURATION);
+          bucketCache = new BucketCache(bucketCacheIOEngineName,
+              bucketCacheSize, writerThreads, writerQueueLen, persistentPath,
+              ioErrorsTolerationDuration);
+        } catch (IOException ioex) {
+          LOG.error("Can't instantiate bucket cache", ioex);
+          throw new RuntimeException(ioex);
+        }
+      }
+      LOG.info("Allocating LruBlockCache with maximum size "
+          + StringUtils.humanReadableInt(lruCacheSize));
+      LruBlockCache lruCache = new LruBlockCache(lruCacheSize,
+          StoreFile.DEFAULT_BLOCKSIZE_SMALL);
+      lruCache.setVictimCache(bucketCache);
+      if (bucketCache != null && combinedWithLru) {
+        globalBlockCache = new CombinedBlockCache(lruCache, bucketCache);
+      } else {
+        globalBlockCache = lruCache;
+      }
     } else {
-      globalBlockCache = new DoubleBlockCache(cacheSize, offHeapCacheSize,
+      globalBlockCache = new DoubleBlockCache(lruCacheSize, offHeapCacheSize,
           StoreFile.DEFAULT_BLOCKSIZE_SMALL, blockSize, conf);
     }
     return globalBlockCache;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java?rev=1432797&r1=1432796&r2=1432797&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java Mon Jan 14 04:08:17 2013
@@ -171,6 +171,22 @@ public class CacheStats {
     windowIndex = (windowIndex + 1) % numPeriodsInWindow;
   }
 
+  public long getSumHitCountsPastNPeriods() {
+    return sum(hitCounts);
+  }
+
+  public long getSumRequestCountsPastNPeriods() {
+    return sum(requestCounts);
+  }
+
+  public long getSumHitCachingCountsPastNPeriods() {
+    return sum(hitCachingCounts);
+  }
+
+  public long getSumRequestCachingCountsPastNPeriods() {
+    return sum(requestCachingCounts);
+  }
+
   public double getHitRatioPastNPeriods() {
     double ratio = ((double)sum(hitCounts)/(double)sum(requestCounts));
     return Double.isNaN(ratio) ? 0 : ratio;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java?rev=1432797&r1=1432796&r2=1432797&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java Mon Jan 14 04:08:17 2013
@@ -56,4 +56,9 @@ public interface Cacheable extends HeapS
    */
   public CacheableDeserializer<Cacheable> getDeserializer();
 
+  /**
+   * @return the block type of this cached HFile block
+   */
+  public BlockType getBlockType();
+
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java?rev=1432797&r1=1432796&r2=1432797&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java Mon Jan 14 04:08:17 2013
@@ -34,4 +34,21 @@ public interface CacheableDeserializer<T
    * @return T the deserialized object.
    */
   public T deserialize(ByteBuffer b) throws IOException;
+
+  /**
+   * 
+   * @param b
+   * @param reuse true if Cacheable object can use the given buffer as its
+   *          content
+   * @return T the deserialized object.
+   * @throws IOException
+   */
+  public T deserialize(ByteBuffer b, boolean reuse) throws IOException;
+
+  /**
+   * Get the identifier of this deserialiser. Identifier is unique for each
+   * deserializer and generated by {@link CacheableDeserializerIdManager}
+   * @return identifier number of this cacheable deserializer
+   */
+  public int getDeserialiserIdentifier();
 }

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java?rev=1432797&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java Mon Jan 14 04:08:17 2013
@@ -0,0 +1,59 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * This class is used to manage the identifiers for
+ * {@link CacheableDeserializer}
+ */
+@InterfaceAudience.Private
+public class CacheableDeserializerIdManager {
+  private static final Map<Integer, CacheableDeserializer<Cacheable>> registeredDeserializers =
+    new HashMap<Integer, CacheableDeserializer<Cacheable>>();
+  private static final AtomicInteger identifier = new AtomicInteger(0);
+
+  /**
+   * Register the given cacheable deserializer and generate an unique identifier
+   * id for it
+   * @param cd
+   * @return the identifier of given cacheable deserializer
+   */
+  public static int registerDeserializer(CacheableDeserializer<Cacheable> cd) {
+    int idx = identifier.incrementAndGet();
+    synchronized (registeredDeserializers) {
+      registeredDeserializers.put(idx, cd);
+    }
+    return idx;
+  }
+
+  /**
+   * Get the cacheable deserializer as the given identifier Id
+   * @param id
+   * @return CacheableDeserializer
+   */
+  public static CacheableDeserializer<Cacheable> getDeserializer(int id) {
+    return registeredDeserializers.get(id);
+  }
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java?rev=1432797&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java Mon Jan 14 04:08:17 2013
@@ -0,0 +1,215 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+
+/**
+ * CombinedBlockCache is an abstraction layer that combines
+ * {@link LruBlockCache} and {@link BucketCache}. The smaller lruCache is used
+ * to cache bloom blocks and index blocks , the larger bucketCache is used to
+ * cache data blocks. getBlock reads first from the smaller lruCache before
+ * looking for the block in the bucketCache. Metrics are the combined size and
+ * hits and misses of both caches.
+ * 
+ **/
+@InterfaceAudience.Private
+public class CombinedBlockCache implements BlockCache, HeapSize {
+
+  private final LruBlockCache lruCache;
+  private final BucketCache bucketCache;
+  private final CombinedCacheStats combinedCacheStats;
+
+  public CombinedBlockCache(LruBlockCache lruCache, BucketCache bucketCache) {
+    this.lruCache = lruCache;
+    this.bucketCache = bucketCache;
+    this.combinedCacheStats = new CombinedCacheStats(lruCache.getStats(),
+        bucketCache.getStats());
+  }
+
+  @Override
+  public long heapSize() {
+    return lruCache.heapSize() + bucketCache.heapSize();
+  }
+
+
+  @Override
+  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
+    boolean isMetaBlock = buf.getBlockType().getCategory() != BlockCategory.DATA;
+    if (isMetaBlock) {
+      lruCache.cacheBlock(cacheKey, buf, inMemory);
+    } else {
+      bucketCache.cacheBlock(cacheKey, buf, inMemory);
+    }
+  }
+
+
+  @Override
+  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
+    cacheBlock(cacheKey, buf, false);
+  }
+
+  @Override
+  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
+      boolean repeat) {
+    if (lruCache.containsBlock(cacheKey)) {
+      return lruCache.getBlock(cacheKey, caching, repeat);
+    }
+    return bucketCache.getBlock(cacheKey, caching, repeat);
+
+  }
+
+  @Override
+  public boolean evictBlock(BlockCacheKey cacheKey) {
+    return lruCache.evictBlock(cacheKey) || bucketCache.evictBlock(cacheKey);
+  }
+
+  @Override
+  public int evictBlocksByHfileName(String hfileName) {
+    return lruCache.evictBlocksByHfileName(hfileName)
+        + bucketCache.evictBlocksByHfileName(hfileName);
+  }
+
+  @Override
+  public CacheStats getStats() {
+    return this.combinedCacheStats;
+  }
+
+  @Override
+  public void shutdown() {
+    lruCache.shutdown();
+    bucketCache.shutdown();
+    
+  }
+
+  @Override
+  public long size() {
+    return lruCache.size() + bucketCache.size();
+  }
+
+  @Override
+  public long getFreeSize() {
+    return lruCache.getFreeSize() + bucketCache.getFreeSize();
+  }
+
+  @Override
+  public long getCurrentSize() {
+    return lruCache.getCurrentSize() + bucketCache.getCurrentSize();
+  }
+
+  @Override
+  public long getEvictedCount() {
+    return lruCache.getEvictedCount() + bucketCache.getEvictedCount();
+  }
+
+  @Override
+  public long getBlockCount() {
+    return lruCache.getBlockCount() + bucketCache.getBlockCount();
+  }
+
+  @Override
+  public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(
+      Configuration conf) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  private static class CombinedCacheStats extends CacheStats {
+    private final CacheStats lruCacheStats;
+    private final CacheStats bucketCacheStats;
+
+    CombinedCacheStats(CacheStats lbcStats, CacheStats fcStats) {
+      this.lruCacheStats = lbcStats;
+      this.bucketCacheStats = fcStats;
+    }
+
+    @Override
+    public long getRequestCount() {
+      return lruCacheStats.getRequestCount()
+          + bucketCacheStats.getRequestCount();
+    }
+
+    @Override
+    public long getRequestCachingCount() {
+      return lruCacheStats.getRequestCachingCount()
+          + bucketCacheStats.getRequestCachingCount();
+    }
+
+    @Override
+    public long getMissCount() {
+      return lruCacheStats.getMissCount() + bucketCacheStats.getMissCount();
+    }
+
+    @Override
+    public long getMissCachingCount() {
+      return lruCacheStats.getMissCachingCount()
+          + bucketCacheStats.getMissCachingCount();
+    }
+
+    @Override
+    public long getHitCount() {
+      return lruCacheStats.getHitCount() + bucketCacheStats.getHitCount();
+    }
+
+    @Override
+    public long getHitCachingCount() {
+      return lruCacheStats.getHitCachingCount()
+          + bucketCacheStats.getHitCachingCount();
+    }
+
+    @Override
+    public long getEvictionCount() {
+      return lruCacheStats.getEvictionCount()
+          + bucketCacheStats.getEvictionCount();
+    }
+
+    @Override
+    public long getEvictedCount() {
+      return lruCacheStats.getEvictedCount()
+          + bucketCacheStats.getEvictedCount();
+    }
+
+    @Override
+    public double getHitRatioPastNPeriods() {
+      double ratio = ((double) (lruCacheStats.getSumHitCountsPastNPeriods() + bucketCacheStats
+          .getSumHitCountsPastNPeriods()) / (double) (lruCacheStats
+          .getSumRequestCountsPastNPeriods() + bucketCacheStats
+          .getSumRequestCountsPastNPeriods()));
+      return Double.isNaN(ratio) ? 0 : ratio;
+    }
+
+    @Override
+    public double getHitCachingRatioPastNPeriods() {
+      double ratio = ((double) (lruCacheStats
+          .getSumHitCachingCountsPastNPeriods() + bucketCacheStats
+          .getSumHitCachingCountsPastNPeriods()) / (double) (lruCacheStats
+          .getSumRequestCachingCountsPastNPeriods() + bucketCacheStats
+          .getSumRequestCachingCountsPastNPeriods()));
+      return Double.isNaN(ratio) ? 0 : ratio;
+    }
+
+  }
+
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1432797&r1=1432796&r2=1432797&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Mon Jan 14 04:08:17 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.encodi
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.regionserver.MemStore;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ChecksumType;
@@ -129,8 +130,9 @@ public class HFileBlock implements Cache
   public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
       ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
 
-  static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_LONG +
-      Bytes.SIZEOF_INT;
+  // minorVersion+offset+nextBlockOnDiskSizeWithHeader
+  public static final int EXTRA_SERIALIZATION_SPACE = 2 * Bytes.SIZEOF_INT
+      + Bytes.SIZEOF_LONG;
 
   /**
    * Each checksum value is an integer that can be stored in 4 bytes.
@@ -139,22 +141,39 @@ public class HFileBlock implements Cache
 
   private static final CacheableDeserializer<Cacheable> blockDeserializer =
       new CacheableDeserializer<Cacheable>() {
-        public HFileBlock deserialize(ByteBuffer buf) throws IOException{
-          ByteBuffer newByteBuffer = ByteBuffer.allocate(buf.limit()
-              - HFileBlock.EXTRA_SERIALIZATION_SPACE);
-          buf.limit(buf.limit()
-              - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
-          newByteBuffer.put(buf);
-          HFileBlock ourBuffer = new HFileBlock(newByteBuffer, 
-                                   MINOR_VERSION_NO_CHECKSUM);
-
+        public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
+          buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
+          ByteBuffer newByteBuffer;
+          if (reuse) {
+            newByteBuffer = buf.slice();
+          } else {
+           newByteBuffer = ByteBuffer.allocate(buf.limit());
+           newByteBuffer.put(buf);
+          }
           buf.position(buf.limit());
           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
+          int minorVersion=buf.getInt();
+          HFileBlock ourBuffer = new HFileBlock(newByteBuffer, minorVersion);
           ourBuffer.offset = buf.getLong();
           ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
           return ourBuffer;
         }
+        
+        @Override
+        public int getDeserialiserIdentifier() {
+          return deserializerIdentifier;
+        }
+
+        @Override
+        public HFileBlock deserialize(ByteBuffer b) throws IOException {
+          return deserialize(b, false);
+        }
       };
+  private static final int deserializerIdentifier;
+  static {
+    deserializerIdentifier = CacheableDeserializerIdManager
+        .registerDeserializer(blockDeserializer);
+  }
 
   private BlockType blockType;
 
@@ -359,6 +378,17 @@ public class HFileBlock implements Cache
   }
 
   /**
+   * Returns the buffer of this block, including header data. The clients must
+   * not modify the buffer object. This method has to be public because it is
+   * used in {@link BucketCache} to avoid buffer copy.
+   * 
+   * @return the byte buffer with header included for read-only operations
+   */
+  public ByteBuffer getBufferReadOnlyWithHeader() {
+    return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice();
+  }
+
+  /**
    * Returns a byte buffer of this block, including header data, positioned at
    * the beginning of header. The underlying data array is not copied.
    *
@@ -1780,7 +1810,17 @@ public class HFileBlock implements Cache
 
   @Override
   public void serialize(ByteBuffer destination) {
-    destination.put(this.buf.duplicate());
+    ByteBuffer dupBuf = this.buf.duplicate();
+    dupBuf.rewind();
+    destination.put(dupBuf);
+    destination.putInt(this.minorVersion);
+    destination.putLong(this.offset);
+    destination.putInt(this.nextBlockOnDiskSizeWithHeader);
+    destination.rewind();
+  }
+
+  public void serializeExtraInfo(ByteBuffer destination) {
+    destination.putInt(this.minorVersion);
     destination.putLong(this.offset);
     destination.putInt(this.nextBlockOnDiskSizeWithHeader);
     destination.rewind();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1432797&r1=1432796&r2=1432797&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Mon Jan 14 04:08:17 2013
@@ -44,6 +44,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CachedBlock.BlockPriority;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.hbase.util.FSUtils;
@@ -173,6 +175,9 @@ public class LruBlockCache implements Bl
   /** Overhead of the structure itself */
   private long overhead;
 
+  /** Where to send victims (blocks evicted from the cache) */
+  private BucketCache victimHandler = null;
+
   /**
    * Default constructor.  Specify maximum size and expected average block
    * size (approximation is fine).
@@ -342,6 +347,8 @@ public class LruBlockCache implements Bl
     CachedBlock cb = map.get(cacheKey);
     if(cb == null) {
       if (!repeat) stats.miss(caching);
+      if (victimHandler != null)
+        return victimHandler.getBlock(cacheKey, caching, repeat);
       return null;
     }
     stats.hit(caching);
@@ -349,12 +356,20 @@ public class LruBlockCache implements Bl
     return cb.getBuffer();
   }
 
+  /**
+   * Whether the cache contains block with specified cacheKey
+   * @param cacheKey
+   * @return true if contains the block
+   */
+  public boolean containsBlock(BlockCacheKey cacheKey) {
+    return map.containsKey(cacheKey);
+  }
 
   @Override
   public boolean evictBlock(BlockCacheKey cacheKey) {
     CachedBlock cb = map.get(cacheKey);
     if (cb == null) return false;
-    evictBlock(cb);
+    evictBlock(cb, false);
     return true;
   }
 
@@ -377,14 +392,31 @@ public class LruBlockCache implements Bl
           ++numEvicted;
       }
     }
+    if (victimHandler != null) {
+      numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
+    }
     return numEvicted;
   }
 
-  protected long evictBlock(CachedBlock block) {
+  /**
+   * Evict the block, and it will be cached by the victim handler if exists &&
+   * block may be read again later
+   * @param block
+   * @param evictedByEvictionProcess true if the given block is evicted by
+   *          EvictionThread
+   * @return the heap size of evicted block
+   */
+  protected long evictBlock(CachedBlock block, boolean evictedByEvictionProcess) {
     map.remove(block.getCacheKey());
     updateSizeMetrics(block, true);
     elements.decrementAndGet();
     stats.evicted();
+    if (evictedByEvictionProcess && victimHandler != null) {
+      boolean wait = getCurrentSize() < acceptableSize();
+      boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
+      victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
+          inMemory, wait);
+    }
     return block.heapSize();
   }
 
@@ -512,7 +544,7 @@ public class LruBlockCache implements Bl
       CachedBlock cb;
       long freedBytes = 0;
       while ((cb = queue.pollLast()) != null) {
-        freedBytes += evictBlock(cb);
+        freedBytes += evictBlock(cb, true);
         if (freedBytes >= toFree) {
           return freedBytes;
         }
@@ -703,7 +735,7 @@ public class LruBlockCache implements Bl
   }
 
   public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
-      (3 * Bytes.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) +
+      (3 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
       (5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
       + ClassSize.OBJECT);
 
@@ -772,6 +804,8 @@ public class LruBlockCache implements Bl
   }
 
   public void shutdown() {
+    if (victimHandler != null)
+      victimHandler.shutdown();
     this.scheduleThreadPool.shutdown();
     for (int i = 0; i < 10; i++) {
       if (!this.scheduleThreadPool.isShutdown()) Threads.sleep(10);
@@ -822,4 +856,9 @@ public class LruBlockCache implements Bl
     return counts;
   }
 
+  public void setVictimCache(BucketCache handler) {
+    assert victimHandler == null;
+    victimHandler = handler;
+  }
+
 }

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java?rev=1432797&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java Mon Jan 14 04:08:17 2013
@@ -0,0 +1,549 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
+
+/**
+ * This class is used to allocate a block with specified size and free the block
+ * when evicting. It manages an array of buckets, each bucket is associated with
+ * a size and caches elements up to this size. For completely empty bucket, this
+ * size could be re-specified dynamically.
+ * 
+ * This class is not thread safe.
+ */
+@InterfaceAudience.Private
+public final class BucketAllocator {
+  static final Log LOG = LogFactory.getLog(BucketAllocator.class);
+
+  final private static class Bucket {
+    private long baseOffset;
+    private int itemAllocationSize, sizeIndex;
+    private int itemCount;
+    private int freeList[];
+    private int freeCount, usedCount;
+
+    public Bucket(long offset) {
+      baseOffset = offset;
+      sizeIndex = -1;
+    }
+
+    void reconfigure(int sizeIndex) {
+      this.sizeIndex = sizeIndex;
+      assert sizeIndex < BUCKET_SIZES.length;
+      itemAllocationSize = BUCKET_SIZES[sizeIndex];
+      itemCount = (int) (((long) BUCKET_CAPACITY) / (long) itemAllocationSize);
+      freeCount = itemCount;
+      usedCount = 0;
+      freeList = new int[itemCount];
+      for (int i = 0; i < freeCount; ++i)
+        freeList[i] = i;
+    }
+
+    public boolean isUninstantiated() {
+      return sizeIndex == -1;
+    }
+
+    public int sizeIndex() {
+      return sizeIndex;
+    }
+
+    public int itemAllocationSize() {
+      return itemAllocationSize;
+    }
+
+    public boolean hasFreeSpace() {
+      return freeCount > 0;
+    }
+
+    public boolean isCompletelyFree() {
+      return usedCount == 0;
+    }
+
+    public int freeCount() {
+      return freeCount;
+    }
+
+    public int usedCount() {
+      return usedCount;
+    }
+
+    public int freeBytes() {
+      return freeCount * itemAllocationSize;
+    }
+
+    public int usedBytes() {
+      return usedCount * itemAllocationSize;
+    }
+
+    public long baseOffset() {
+      return baseOffset;
+    }
+
+    /**
+     * Allocate a block in this bucket, return the offset representing the
+     * position in physical space
+     * @return the offset in the IOEngine
+     */
+    public long allocate() {
+      assert freeCount > 0; // Else should not have been called
+      assert sizeIndex != -1;
+      ++usedCount;
+      long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
+      assert offset >= 0;
+      return offset;
+    }
+
+    public void addAllocation(long offset) throws BucketAllocatorException {
+      offset -= baseOffset;
+      if (offset < 0 || offset % itemAllocationSize != 0)
+        throw new BucketAllocatorException(
+            "Attempt to add allocation for bad offset: " + offset + " base="
+                + baseOffset + ", bucket size=" + itemAllocationSize);
+      int idx = (int) (offset / itemAllocationSize);
+      boolean matchFound = false;
+      for (int i = 0; i < freeCount; ++i) {
+        if (matchFound) freeList[i - 1] = freeList[i];
+        else if (freeList[i] == idx) matchFound = true;
+      }
+      if (!matchFound)
+        throw new BucketAllocatorException("Couldn't find match for index "
+            + idx + " in free list");
+      ++usedCount;
+      --freeCount;
+    }
+
+    private void free(long offset) {
+      offset -= baseOffset;
+      assert offset >= 0;
+      assert offset < itemCount * itemAllocationSize;
+      assert offset % itemAllocationSize == 0;
+      assert usedCount > 0;
+      assert freeCount < itemCount; // Else duplicate free
+      int item = (int) (offset / (long) itemAllocationSize);
+      assert !freeListContains(item);
+      --usedCount;
+      freeList[freeCount++] = item;
+    }
+
+    private boolean freeListContains(int blockNo) {
+      for (int i = 0; i < freeCount; ++i) {
+        if (freeList[i] == blockNo) return true;
+      }
+      return false;
+    }
+  }
+
+  final class BucketSizeInfo {
+    // Free bucket means it has space to allocate a block;
+    // Completely free bucket means it has no block.
+    private List<Bucket> bucketList, freeBuckets, completelyFreeBuckets;
+    private int sizeIndex;
+
+    BucketSizeInfo(int sizeIndex) {
+      bucketList = new ArrayList<Bucket>();
+      freeBuckets = new ArrayList<Bucket>();
+      completelyFreeBuckets = new ArrayList<Bucket>();
+      this.sizeIndex = sizeIndex;
+    }
+
+    public void instantiateBucket(Bucket b) {
+      assert b.isUninstantiated() || b.isCompletelyFree();
+      b.reconfigure(sizeIndex);
+      bucketList.add(b);
+      freeBuckets.add(b);
+      completelyFreeBuckets.add(b);
+    }
+
+    public int sizeIndex() {
+      return sizeIndex;
+    }
+
+    /**
+     * Find a bucket to allocate a block
+     * @return the offset in the IOEngine
+     */
+    public long allocateBlock() {
+      Bucket b = null;
+      if (freeBuckets.size() > 0) // Use up an existing one first...
+        b = freeBuckets.get(freeBuckets.size() - 1);
+      if (b == null) {
+        b = grabGlobalCompletelyFreeBucket();
+        if (b != null) instantiateBucket(b);
+      }
+      if (b == null) return -1;
+      long result = b.allocate();
+      blockAllocated(b);
+      return result;
+    }
+
+    void blockAllocated(Bucket b) {
+      if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
+      if (!b.hasFreeSpace()) freeBuckets.remove(b);
+    }
+
+    public Bucket findAndRemoveCompletelyFreeBucket() {
+      Bucket b = null;
+      assert bucketList.size() > 0;
+      if (bucketList.size() == 1) {
+        // So we never get complete starvation of a bucket for a size
+        return null;
+      }
+
+      if (completelyFreeBuckets.size() > 0) {
+        b = completelyFreeBuckets.get(0);
+        removeBucket(b);
+      }
+      return b;
+    }
+
+    private void removeBucket(Bucket b) {
+      assert b.isCompletelyFree();
+      bucketList.remove(b);
+      freeBuckets.remove(b);
+      completelyFreeBuckets.remove(b);
+    }
+
+    public void freeBlock(Bucket b, long offset) {
+      assert bucketList.contains(b);
+      // else we shouldn't have anything to free...
+      assert (!completelyFreeBuckets.contains(b));
+      b.free(offset);
+      if (!freeBuckets.contains(b)) freeBuckets.add(b);
+      if (b.isCompletelyFree()) completelyFreeBuckets.add(b);
+    }
+
+    public IndexStatistics statistics() {
+      long free = 0, used = 0;
+      for (Bucket b : bucketList) {
+        free += b.freeCount();
+        used += b.usedCount();
+      }
+      return new IndexStatistics(free, used, BUCKET_SIZES[sizeIndex]);
+    }
+  }
+
+  // Default block size is 64K, so we choose more sizes near 64K, you'd better
+  // reset it according to your cluster's block size distribution
+  // TODO Make these sizes configurable
+  // TODO Support the view of block size distribution statistics
+  private static final int BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
+      16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
+      56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
+      192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
+      512 * 1024 + 1024 };
+
+  /**
+   * Round up the given block size to bucket size, and get the corresponding
+   * BucketSizeInfo
+   * @param blockSize
+   * @return BucketSizeInfo
+   */
+  public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
+    for (int i = 0; i < BUCKET_SIZES.length; ++i)
+      if (blockSize <= BUCKET_SIZES[i])
+        return bucketSizeInfos[i];
+    return null;
+  }
+
+  static final int BIG_ITEM_SIZE = (512 * 1024) + 1024; // 513K plus overhead
+  static public final int FEWEST_ITEMS_IN_BUCKET = 4;
+  // The capacity size for each bucket
+  static final long BUCKET_CAPACITY = FEWEST_ITEMS_IN_BUCKET * BIG_ITEM_SIZE;
+
+  private Bucket[] buckets;
+  private BucketSizeInfo[] bucketSizeInfos;
+  private final long totalSize;
+  private long usedSize = 0;
+
+  BucketAllocator(long availableSpace) throws BucketAllocatorException {
+    buckets = new Bucket[(int) (availableSpace / (long) BUCKET_CAPACITY)];
+    if (buckets.length < BUCKET_SIZES.length)
+      throw new BucketAllocatorException(
+          "Bucket allocator size too small - must have room for at least "
+              + BUCKET_SIZES.length + " buckets");
+    bucketSizeInfos = new BucketSizeInfo[BUCKET_SIZES.length];
+    for (int i = 0; i < BUCKET_SIZES.length; ++i) {
+      bucketSizeInfos[i] = new BucketSizeInfo(i);
+    }
+    for (int i = 0; i < buckets.length; ++i) {
+      buckets[i] = new Bucket(BUCKET_CAPACITY * i);
+      bucketSizeInfos[i < BUCKET_SIZES.length ? i : BUCKET_SIZES.length - 1]
+          .instantiateBucket(buckets[i]);
+    }
+    this.totalSize = ((long) buckets.length) * BUCKET_CAPACITY;
+  }
+
+  /**
+   * Rebuild the allocator's data structures from a persisted map.
+   * @param availableSpace capacity of cache
+   * @param map A map stores the block key and BucketEntry(block's meta data
+   *          like offset, length)
+   * @param realCacheSize cached data size statistics for bucket cache
+   * @throws BucketAllocatorException
+   */
+  BucketAllocator(long availableSpace, Map<BlockCacheKey, BucketEntry> map,
+      AtomicLong realCacheSize) throws BucketAllocatorException {
+    this(availableSpace);
+
+    // each bucket has an offset, sizeindex. probably the buckets are too big
+    // in our default state. so what we do is reconfigure them according to what
+    // we've found. we can only reconfigure each bucket once; if more than once,
+    // we know there's a bug, so we just log the info, throw, and start again...
+    boolean[] reconfigured = new boolean[buckets.length];
+    for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) {
+      long foundOffset = entry.getValue().offset();
+      int foundLen = entry.getValue().getLength();
+      int bucketSizeIndex = -1;
+      for (int i = 0; i < BUCKET_SIZES.length; ++i) {
+        if (foundLen <= BUCKET_SIZES[i]) {
+          bucketSizeIndex = i;
+          break;
+        }
+      }
+      if (bucketSizeIndex == -1) {
+        throw new BucketAllocatorException(
+            "Can't match bucket size for the block with size " + foundLen);
+      }
+      int bucketNo = (int) (foundOffset / (long) BUCKET_CAPACITY);
+      if (bucketNo < 0 || bucketNo >= buckets.length)
+        throw new BucketAllocatorException("Can't find bucket " + bucketNo
+            + ", total buckets=" + buckets.length
+            + "; did you shrink the cache?");
+      Bucket b = buckets[bucketNo];
+      if (reconfigured[bucketNo] == true) {
+        if (b.sizeIndex() != bucketSizeIndex)
+          throw new BucketAllocatorException(
+              "Inconsistent allocation in bucket map;");
+      } else {
+        if (!b.isCompletelyFree())
+          throw new BucketAllocatorException("Reconfiguring bucket "
+              + bucketNo + " but it's already allocated; corrupt data");
+        // Need to remove the bucket from whichever list it's currently in at
+        // the moment...
+        BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
+        BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
+        oldbsi.removeBucket(b);
+        bsi.instantiateBucket(b);
+        reconfigured[bucketNo] = true;
+      }
+      realCacheSize.addAndGet(foundLen);
+      buckets[bucketNo].addAllocation(foundOffset);
+      usedSize += buckets[bucketNo].itemAllocationSize();
+      bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
+    }
+  }
+
+  public String getInfo() {
+    StringBuilder sb = new StringBuilder(1024);
+    for (int i = 0; i < buckets.length; ++i) {
+      Bucket b = buckets[i];
+      sb.append("    Bucket ").append(i).append(": ").append(b.itemAllocationSize());
+      sb.append(" freeCount=").append(b.freeCount()).append(" used=")
+          .append(b.usedCount());
+      sb.append('\n');
+    }
+    return sb.toString();
+  }
+
+  public long getUsedSize() {
+    return this.usedSize;
+  }
+
+  public long getFreeSize() {
+    long freeSize = this.totalSize - getUsedSize();
+    return freeSize;
+  }
+
+  public long getTotalSize() {
+    return this.totalSize;
+  }
+
+  /**
+   * Allocate a block with specified size. Return the offset
+   * @param blockSize size of block
+   * @throws BucketAllocatorException,CacheFullException
+   * @return the offset in the IOEngine
+   */
+  public synchronized long allocateBlock(int blockSize) throws CacheFullException,
+      BucketAllocatorException {
+    assert blockSize > 0;
+    BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
+    if (bsi == null) {
+      throw new BucketAllocatorException("Allocation too big size=" + blockSize);
+    }
+    long offset = bsi.allocateBlock();
+
+    // Ask caller to free up space and try again!
+    if (offset < 0)
+      throw new CacheFullException(blockSize, bsi.sizeIndex());
+    usedSize += BUCKET_SIZES[bsi.sizeIndex()];
+    return offset;
+  }
+
+  private Bucket grabGlobalCompletelyFreeBucket() {
+    for (BucketSizeInfo bsi : bucketSizeInfos) {
+      Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
+      if (b != null) return b;
+    }
+    return null;
+  }
+
+  /**
+   * Free a block with the offset
+   * @param offset block's offset
+   * @return size freed
+   */
+  public synchronized int freeBlock(long offset) {
+    int bucketNo = (int) (offset / (long) BUCKET_CAPACITY);
+    assert bucketNo >= 0 && bucketNo < buckets.length;
+    Bucket targetBucket = buckets[bucketNo];
+    bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
+    usedSize -= targetBucket.itemAllocationSize();
+    return targetBucket.itemAllocationSize();
+  }
+
+  public int sizeIndexOfAllocation(long offset) {
+    int bucketNo = (int) (offset / (long) BUCKET_CAPACITY);
+    assert bucketNo >= 0 && bucketNo < buckets.length;
+    Bucket targetBucket = buckets[bucketNo];
+    return targetBucket.sizeIndex();
+  }
+
+  public int sizeOfAllocation(long offset) {
+    int bucketNo = (int) (offset / (long) BUCKET_CAPACITY);
+    assert bucketNo >= 0 && bucketNo < buckets.length;
+    Bucket targetBucket = buckets[bucketNo];
+    return targetBucket.itemAllocationSize();
+  }
+
+  static public int getMaximumAllocationIndex() {
+    return BUCKET_SIZES.length;
+  }
+
+  static class IndexStatistics {
+    private long freeCount, usedCount, itemSize, totalCount;
+
+    public long freeCount() {
+      return freeCount;
+    }
+
+    public long usedCount() {
+      return usedCount;
+    }
+
+    public long totalCount() {
+      return totalCount;
+    }
+
+    public long freeBytes() {
+      return freeCount * itemSize;
+    }
+
+    public long usedBytes() {
+      return usedCount * itemSize;
+    }
+
+    public long totalBytes() {
+      return totalCount * itemSize;
+    }
+
+    public long itemSize() {
+      return itemSize;
+    }
+
+    public IndexStatistics(long free, long used, long itemSize) {
+      setTo(free, used, itemSize);
+    }
+
+    public IndexStatistics() {
+      setTo(-1, -1, 0);
+    }
+
+    public void setTo(long free, long used, long itemSize) {
+      this.itemSize = itemSize;
+      this.freeCount = free;
+      this.usedCount = used;
+      this.totalCount = free + used;
+    }
+  }
+
+  public void dumpToLog() {
+    logStatistics();
+    StringBuilder sb = new StringBuilder();
+    for (Bucket b : buckets) {
+      sb.append("Bucket:").append(b.baseOffset).append('\n');
+      sb.append("  Size index: " + b.sizeIndex() + "; Free:" + b.freeCount
+          + "; used:" + b.usedCount + "; freelist\n");
+      for (int i = 0; i < b.freeCount(); ++i)
+        sb.append(b.freeList[i]).append(',');
+      sb.append('\n');
+    }
+    LOG.info(sb);
+  }
+
+  public void logStatistics() {
+    IndexStatistics total = new IndexStatistics();
+    IndexStatistics[] stats = getIndexStatistics(total);
+    LOG.info("Bucket allocator statistics follow:\n");
+    LOG.info("  Free bytes=" + total.freeBytes() + "+; used bytes="
+        + total.usedBytes() + "; total bytes=" + total.totalBytes());
+    for (IndexStatistics s : stats) {
+      LOG.info("  Object size " + s.itemSize() + " used=" + s.usedCount()
+          + "; free=" + s.freeCount() + "; total=" + s.totalCount());
+    }
+  }
+
+  public IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
+    IndexStatistics[] stats = getIndexStatistics();
+    long totalfree = 0, totalused = 0;
+    for (IndexStatistics stat : stats) {
+      totalfree += stat.freeBytes();
+      totalused += stat.usedBytes();
+    }
+    grandTotal.setTo(totalfree, totalused, 1);
+    return stats;
+  }
+
+  public IndexStatistics[] getIndexStatistics() {
+    IndexStatistics[] stats = new IndexStatistics[BUCKET_SIZES.length];
+    for (int i = 0; i < stats.length; ++i)
+      stats[i] = bucketSizeInfos[i].statistics();
+    return stats;
+  }
+
+  public long freeBlock(long freeList[]) {
+    long sz = 0;
+    for (int i = 0; i < freeList.length; ++i)
+      sz += freeBlock(freeList[i]);
+    return sz;
+  }
+
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java?rev=1432797&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java Mon Jan 14 04:08:17 2013
@@ -0,0 +1,35 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Thrown by {@link BucketAllocator}
+ */
+@InterfaceAudience.Private
+public class BucketAllocatorException extends IOException {
+  private static final long serialVersionUID = 2479119906660788096L;
+
+  BucketAllocatorException(String reason) {
+    super(reason);
+  }
+}
\ No newline at end of file