You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zj...@apache.org on 2013/01/14 05:08:18 UTC
svn commit: r1432797 [1/2] - in /hbase/trunk:
hbase-common/src/main/java/org/apache/hadoop/hbase/util/
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/
hbase-server/src/tes...
Author: zjushch
Date: Mon Jan 14 04:08:17 2013
New Revision: 1432797
URL: http://svn.apache.org/viewvc?rev=1432797&view=rev
Log:
HBASE-7404 Bucket Cache:A solution about CMS,Heap Fragment and Big Cache on HBASE (Chunhui)
Added:
hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCacheStats.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CacheFullException.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java?rev=1432797&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java Mon Jan 14 04:08:17 2013
@@ -0,0 +1,195 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This class manages an array of ByteBuffers with a default size 4MB. These
+ * buffers are sequential and could be considered as a large buffer.It supports
+ * reading/writing data from this large buffer with a position and offset
+ */
+@InterfaceAudience.Public
+public final class ByteBufferArray {
+ static final Log LOG = LogFactory.getLog(ByteBufferArray.class);
+
+ static final int DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
+ private ByteBuffer buffers[];
+ private Lock locks[];
+ private int bufferSize;
+ private int bufferCount;
+
+ /**
+ * We allocate a number of byte buffers as the capacity. In order not to out
+ * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}),
+ * we will allocate one additional buffer with capacity 0;
+ * @param capacity total size of the byte buffer array
+ * @param directByteBuffer true if we allocate direct buffer
+ */
+ public ByteBufferArray(long capacity, boolean directByteBuffer) {
+ this.bufferSize = DEFAULT_BUFFER_SIZE;
+ if (this.bufferSize > (capacity / 16))
+ this.bufferSize = (int) roundUp(capacity / 16, 32768);
+ this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize);
+ LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity)
+ + " , sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count="
+ + bufferCount);
+ buffers = new ByteBuffer[bufferCount + 1];
+ locks = new Lock[bufferCount + 1];
+ for (int i = 0; i <= bufferCount; i++) {
+ locks[i] = new ReentrantLock();
+ if (i < bufferCount) {
+ buffers[i] = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize)
+ : ByteBuffer.allocate(bufferSize);
+ } else {
+ buffers[i] = ByteBuffer.allocate(0);
+ }
+
+ }
+ }
+
+ private long roundUp(long n, long to) {
+ return ((n + to - 1) / to) * to;
+ }
+
+ /**
+ * Transfers bytes from this buffer array into the given destination array
+ * @param start start position in the ByteBufferArray
+ * @param len The maximum number of bytes to be written to the given array
+ * @param dstArray The array into which bytes are to be written
+ */
+ public void getMultiple(long start, int len, byte[] dstArray) {
+ getMultiple(start, len, dstArray, 0);
+ }
+
+ /**
+ * Transfers bytes from this buffer array into the given destination array
+ * @param start start offset of this buffer array
+ * @param len The maximum number of bytes to be written to the given array
+ * @param dstArray The array into which bytes are to be written
+ * @param dstOffset The offset within the given array of the first byte to be
+ * written
+ */
+ public void getMultiple(long start, int len, byte[] dstArray, int dstOffset) {
+ multiple(start, len, dstArray, dstOffset, new Visitor() {
+ public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
+ bb.get(array, arrayIdx, len);
+ }
+ });
+ }
+
+ /**
+ * Transfers bytes from the given source array into this buffer array
+ * @param start start offset of this buffer array
+ * @param len The maximum number of bytes to be read from the given array
+ * @param srcArray The array from which bytes are to be read
+ */
+ public void putMultiple(long start, int len, byte[] srcArray) {
+ putMultiple(start, len, srcArray, 0);
+ }
+
+ /**
+ * Transfers bytes from the given source array into this buffer array
+ * @param start start offset of this buffer array
+ * @param len The maximum number of bytes to be read from the given array
+ * @param srcArray The array from which bytes are to be read
+ * @param srcOffset The offset within the given array of the first byte to be
+ * read
+ */
+ public void putMultiple(long start, int len, byte[] srcArray, int srcOffset) {
+ multiple(start, len, srcArray, srcOffset, new Visitor() {
+ public void visit(ByteBuffer bb, byte[] array, int arrayIdx, int len) {
+ bb.put(array, arrayIdx, len);
+ }
+ });
+ }
+
+ private interface Visitor {
+ /**
+ * Visit the given byte buffer, if it is a read action, we will transfer the
+ * bytes from the buffer to the destination array, else if it is a write
+ * action, we will transfer the bytes from the source array to the buffer
+ * @param bb byte buffer
+ * @param array a source or destination byte array
+ * @param arrayOffset offset of the byte array
+ * @param len read/write length
+ */
+ void visit(ByteBuffer bb, byte[] array, int arrayOffset, int len);
+ }
+
+ /**
+ * Access(read or write) this buffer array with a position and length as the
+ * given array. Here we will only lock one buffer even if it may be need visit
+ * several buffers. The consistency is guaranteed by the caller.
+ * @param start start offset of this buffer array
+ * @param len The maximum number of bytes to be accessed
+ * @param array The array from/to which bytes are to be read/written
+ * @param arrayOffset The offset within the given array of the first byte to
+ * be read or written
+ * @param visitor implement of how to visit the byte buffer
+ */
+ void multiple(long start, int len, byte[] array, int arrayOffset, Visitor visitor) {
+ assert len >= 0;
+ long end = start + len;
+ int startBuffer = (int) (start / bufferSize), startOffset = (int) (start % bufferSize);
+ int endBuffer = (int) (end / bufferSize), endOffset = (int) (end % bufferSize);
+ assert array.length >= len + arrayOffset;
+ assert startBuffer >= 0 && startBuffer < bufferCount;
+ assert endBuffer >= 0 && endBuffer < bufferCount
+ || (endBuffer == bufferCount && endOffset == 0);
+ if (startBuffer >= locks.length || startBuffer < 0) {
+ String msg = "Failed multiple, start=" + start + ",startBuffer="
+ + startBuffer + ",bufferSize=" + bufferSize;
+ LOG.error(msg);
+ throw new RuntimeException(msg);
+ }
+ int srcIndex = 0, cnt = -1;
+ for (int i = startBuffer; i <= endBuffer; ++i) {
+ Lock lock = locks[i];
+ lock.lock();
+ try {
+ ByteBuffer bb = buffers[i];
+ if (i == startBuffer) {
+ cnt = bufferSize - startOffset;
+ if (cnt > len) cnt = len;
+ bb.limit(startOffset + cnt).position(
+ startOffset );
+ } else if (i == endBuffer) {
+ cnt = endOffset;
+ bb.limit(cnt).position(0);
+ } else {
+ cnt = bufferSize ;
+ bb.limit(cnt).position(0);
+ }
+ visitor.visit(bb, array, srcIndex + arrayOffset, cnt);
+ srcIndex += cnt;
+ } finally {
+ lock.unlock();
+ }
+ }
+ assert srcIndex == len;
+ }
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java?rev=1432797&r1=1432796&r2=1432797&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheKey.java Mon Jan 14 04:08:17 2013
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.util.Clas
* Cache Key for use with implementations of {@link BlockCache}
*/
@InterfaceAudience.Private
-public class BlockCacheKey implements HeapSize {
+public class BlockCacheKey implements HeapSize, java.io.Serializable {
private final String hfileName;
private final long offset;
private final DataBlockEncoding encoding;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java?rev=1432797&r1=1432796&r2=1432797&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java Mon Jan 14 04:08:17 2013
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
@@ -27,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.DirectMemoryUtils;
import org.apache.hadoop.util.StringUtils;
@@ -72,6 +74,28 @@ public class CacheConfig {
public static final String EVICT_BLOCKS_ON_CLOSE_KEY =
"hbase.rs.evictblocksonclose";
+ /**
+ * Configuration keys for Bucket cache
+ */
+ public static final String BUCKET_CACHE_IOENGINE_KEY = "hbase.bucketcache.ioengine";
+ public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size";
+ public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY =
+ "hbase.bucketcache.persistent.path";
+ public static final String BUCKET_CACHE_COMBINED_KEY =
+ "hbase.bucketcache.combinedcache.enabled";
+ public static final String BUCKET_CACHE_COMBINED_PERCENTAGE_KEY =
+ "hbase.bucketcache.percentage.in.combinedcache";
+ public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads";
+ public static final String BUCKET_CACHE_WRITER_QUEUE_KEY =
+ "hbase.bucketcache.writer.queuelength";
+ /**
+ * Defaults for Bucket cache
+ */
+ public static final boolean DEFAULT_BUCKET_CACHE_COMBINED = true;
+ public static final int DEFAULT_BUCKET_CACHE_WRITER_THREADS = 3;
+ public static final int DEFAULT_BUCKET_CACHE_WRITER_QUEUE = 64;
+ public static final float DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE = 0.9f;
+
// Defaults
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
@@ -341,19 +365,60 @@ public class CacheConfig {
// Calculate the amount of heap to give the heap.
MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
- long cacheSize = (long)(mu.getMax() * cachePercentage);
+ long lruCacheSize = (long) (mu.getMax() * cachePercentage);
int blockSize = conf.getInt("hbase.offheapcache.minblocksize",
HFile.DEFAULT_BLOCKSIZE);
long offHeapCacheSize =
(long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0) *
DirectMemoryUtils.getDirectMemorySize());
- LOG.info("Allocating LruBlockCache with maximum size " +
- StringUtils.humanReadableInt(cacheSize));
if (offHeapCacheSize <= 0) {
- globalBlockCache = new LruBlockCache(cacheSize,
- StoreFile.DEFAULT_BLOCKSIZE_SMALL, conf);
+ String bucketCacheIOEngineName = conf
+ .get(BUCKET_CACHE_IOENGINE_KEY, null);
+ float bucketCachePercentage = conf.getFloat(BUCKET_CACHE_SIZE_KEY, 0F);
+ // A percentage of max heap size or a absolute value with unit megabytes
+ long bucketCacheSize = (long) (bucketCachePercentage < 1 ? mu.getMax()
+ * bucketCachePercentage : bucketCachePercentage * 1024 * 1024);
+
+ boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY,
+ DEFAULT_BUCKET_CACHE_COMBINED);
+ BucketCache bucketCache = null;
+ if (bucketCacheIOEngineName != null && bucketCacheSize > 0) {
+ int writerThreads = conf.getInt(BUCKET_CACHE_WRITER_THREADS_KEY,
+ DEFAULT_BUCKET_CACHE_WRITER_THREADS);
+ int writerQueueLen = conf.getInt(BUCKET_CACHE_WRITER_QUEUE_KEY,
+ DEFAULT_BUCKET_CACHE_WRITER_QUEUE);
+ String persistentPath = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY);
+ float combinedPercentage = conf.getFloat(
+ BUCKET_CACHE_COMBINED_PERCENTAGE_KEY,
+ DEFAULT_BUCKET_CACHE_COMBINED_PERCENTAGE);
+ if (combinedWithLru) {
+ lruCacheSize = (long) ((1 - combinedPercentage) * bucketCacheSize);
+ bucketCacheSize = (long) (combinedPercentage * bucketCacheSize);
+ }
+ try {
+ int ioErrorsTolerationDuration = conf.getInt(
+ "hbase.bucketcache.ioengine.errors.tolerated.duration",
+ BucketCache.DEFAULT_ERROR_TOLERATION_DURATION);
+ bucketCache = new BucketCache(bucketCacheIOEngineName,
+ bucketCacheSize, writerThreads, writerQueueLen, persistentPath,
+ ioErrorsTolerationDuration);
+ } catch (IOException ioex) {
+ LOG.error("Can't instantiate bucket cache", ioex);
+ throw new RuntimeException(ioex);
+ }
+ }
+ LOG.info("Allocating LruBlockCache with maximum size "
+ + StringUtils.humanReadableInt(lruCacheSize));
+ LruBlockCache lruCache = new LruBlockCache(lruCacheSize,
+ StoreFile.DEFAULT_BLOCKSIZE_SMALL);
+ lruCache.setVictimCache(bucketCache);
+ if (bucketCache != null && combinedWithLru) {
+ globalBlockCache = new CombinedBlockCache(lruCache, bucketCache);
+ } else {
+ globalBlockCache = lruCache;
+ }
} else {
- globalBlockCache = new DoubleBlockCache(cacheSize, offHeapCacheSize,
+ globalBlockCache = new DoubleBlockCache(lruCacheSize, offHeapCacheSize,
StoreFile.DEFAULT_BLOCKSIZE_SMALL, blockSize, conf);
}
return globalBlockCache;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java?rev=1432797&r1=1432796&r2=1432797&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java Mon Jan 14 04:08:17 2013
@@ -171,6 +171,22 @@ public class CacheStats {
windowIndex = (windowIndex + 1) % numPeriodsInWindow;
}
+ public long getSumHitCountsPastNPeriods() {
+ return sum(hitCounts);
+ }
+
+ public long getSumRequestCountsPastNPeriods() {
+ return sum(requestCounts);
+ }
+
+ public long getSumHitCachingCountsPastNPeriods() {
+ return sum(hitCachingCounts);
+ }
+
+ public long getSumRequestCachingCountsPastNPeriods() {
+ return sum(requestCachingCounts);
+ }
+
public double getHitRatioPastNPeriods() {
double ratio = ((double)sum(hitCounts)/(double)sum(requestCounts));
return Double.isNaN(ratio) ? 0 : ratio;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java?rev=1432797&r1=1432796&r2=1432797&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java Mon Jan 14 04:08:17 2013
@@ -56,4 +56,9 @@ public interface Cacheable extends HeapS
*/
public CacheableDeserializer<Cacheable> getDeserializer();
+ /**
+ * @return the block type of this cached HFile block
+ */
+ public BlockType getBlockType();
+
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java?rev=1432797&r1=1432796&r2=1432797&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java Mon Jan 14 04:08:17 2013
@@ -34,4 +34,21 @@ public interface CacheableDeserializer<T
* @return T the deserialized object.
*/
public T deserialize(ByteBuffer b) throws IOException;
+
+ /**
+ *
+ * @param b
+ * @param reuse true if Cacheable object can use the given buffer as its
+ * content
+ * @return T the deserialized object.
+ * @throws IOException
+ */
+ public T deserialize(ByteBuffer b, boolean reuse) throws IOException;
+
+ /**
+ * Get the identifier of this deserialiser. Identifier is unique for each
+ * deserializer and generated by {@link CacheableDeserializerIdManager}
+ * @return identifier number of this cacheable deserializer
+ */
+ public int getDeserialiserIdentifier();
}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java?rev=1432797&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java Mon Jan 14 04:08:17 2013
@@ -0,0 +1,59 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * This class is used to manage the identifiers for
+ * {@link CacheableDeserializer}
+ */
+@InterfaceAudience.Private
+public class CacheableDeserializerIdManager {
+ private static final Map<Integer, CacheableDeserializer<Cacheable>> registeredDeserializers =
+ new HashMap<Integer, CacheableDeserializer<Cacheable>>();
+ private static final AtomicInteger identifier = new AtomicInteger(0);
+
+ /**
+ * Register the given cacheable deserializer and generate an unique identifier
+ * id for it
+ * @param cd
+ * @return the identifier of given cacheable deserializer
+ */
+ public static int registerDeserializer(CacheableDeserializer<Cacheable> cd) {
+ int idx = identifier.incrementAndGet();
+ synchronized (registeredDeserializers) {
+ registeredDeserializers.put(idx, cd);
+ }
+ return idx;
+ }
+
+ /**
+ * Get the cacheable deserializer as the given identifier Id
+ * @param id
+ * @return CacheableDeserializer
+ */
+ public static CacheableDeserializer<Cacheable> getDeserializer(int id) {
+ return registeredDeserializers.get(id);
+ }
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java?rev=1432797&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java Mon Jan 14 04:08:17 2013
@@ -0,0 +1,215 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+
+/**
+ * CombinedBlockCache is an abstraction layer that combines
+ * {@link LruBlockCache} and {@link BucketCache}. The smaller lruCache is used
+ * to cache bloom blocks and index blocks , the larger bucketCache is used to
+ * cache data blocks. getBlock reads first from the smaller lruCache before
+ * looking for the block in the bucketCache. Metrics are the combined size and
+ * hits and misses of both caches.
+ *
+ **/
+@InterfaceAudience.Private
+public class CombinedBlockCache implements BlockCache, HeapSize {
+
+ private final LruBlockCache lruCache;
+ private final BucketCache bucketCache;
+ private final CombinedCacheStats combinedCacheStats;
+
+ public CombinedBlockCache(LruBlockCache lruCache, BucketCache bucketCache) {
+ this.lruCache = lruCache;
+ this.bucketCache = bucketCache;
+ this.combinedCacheStats = new CombinedCacheStats(lruCache.getStats(),
+ bucketCache.getStats());
+ }
+
+ @Override
+ public long heapSize() {
+ return lruCache.heapSize() + bucketCache.heapSize();
+ }
+
+
+ @Override
+ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
+ boolean isMetaBlock = buf.getBlockType().getCategory() != BlockCategory.DATA;
+ if (isMetaBlock) {
+ lruCache.cacheBlock(cacheKey, buf, inMemory);
+ } else {
+ bucketCache.cacheBlock(cacheKey, buf, inMemory);
+ }
+ }
+
+
+ @Override
+ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
+ cacheBlock(cacheKey, buf, false);
+ }
+
+ @Override
+ public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
+ boolean repeat) {
+ if (lruCache.containsBlock(cacheKey)) {
+ return lruCache.getBlock(cacheKey, caching, repeat);
+ }
+ return bucketCache.getBlock(cacheKey, caching, repeat);
+
+ }
+
+ @Override
+ public boolean evictBlock(BlockCacheKey cacheKey) {
+ return lruCache.evictBlock(cacheKey) || bucketCache.evictBlock(cacheKey);
+ }
+
+ @Override
+ public int evictBlocksByHfileName(String hfileName) {
+ return lruCache.evictBlocksByHfileName(hfileName)
+ + bucketCache.evictBlocksByHfileName(hfileName);
+ }
+
+ @Override
+ public CacheStats getStats() {
+ return this.combinedCacheStats;
+ }
+
+ @Override
+ public void shutdown() {
+ lruCache.shutdown();
+ bucketCache.shutdown();
+
+ }
+
+ @Override
+ public long size() {
+ return lruCache.size() + bucketCache.size();
+ }
+
+ @Override
+ public long getFreeSize() {
+ return lruCache.getFreeSize() + bucketCache.getFreeSize();
+ }
+
+ @Override
+ public long getCurrentSize() {
+ return lruCache.getCurrentSize() + bucketCache.getCurrentSize();
+ }
+
+ @Override
+ public long getEvictedCount() {
+ return lruCache.getEvictedCount() + bucketCache.getEvictedCount();
+ }
+
+ @Override
+ public long getBlockCount() {
+ return lruCache.getBlockCount() + bucketCache.getBlockCount();
+ }
+
+ @Override
+ public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(
+ Configuration conf) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ private static class CombinedCacheStats extends CacheStats {
+ private final CacheStats lruCacheStats;
+ private final CacheStats bucketCacheStats;
+
+ CombinedCacheStats(CacheStats lbcStats, CacheStats fcStats) {
+ this.lruCacheStats = lbcStats;
+ this.bucketCacheStats = fcStats;
+ }
+
+ @Override
+ public long getRequestCount() {
+ return lruCacheStats.getRequestCount()
+ + bucketCacheStats.getRequestCount();
+ }
+
+ @Override
+ public long getRequestCachingCount() {
+ return lruCacheStats.getRequestCachingCount()
+ + bucketCacheStats.getRequestCachingCount();
+ }
+
+ @Override
+ public long getMissCount() {
+ return lruCacheStats.getMissCount() + bucketCacheStats.getMissCount();
+ }
+
+ @Override
+ public long getMissCachingCount() {
+ return lruCacheStats.getMissCachingCount()
+ + bucketCacheStats.getMissCachingCount();
+ }
+
+ @Override
+ public long getHitCount() {
+ return lruCacheStats.getHitCount() + bucketCacheStats.getHitCount();
+ }
+
+ @Override
+ public long getHitCachingCount() {
+ return lruCacheStats.getHitCachingCount()
+ + bucketCacheStats.getHitCachingCount();
+ }
+
+ @Override
+ public long getEvictionCount() {
+ return lruCacheStats.getEvictionCount()
+ + bucketCacheStats.getEvictionCount();
+ }
+
+ @Override
+ public long getEvictedCount() {
+ return lruCacheStats.getEvictedCount()
+ + bucketCacheStats.getEvictedCount();
+ }
+
+ @Override
+ public double getHitRatioPastNPeriods() {
+ double ratio = ((double) (lruCacheStats.getSumHitCountsPastNPeriods() + bucketCacheStats
+ .getSumHitCountsPastNPeriods()) / (double) (lruCacheStats
+ .getSumRequestCountsPastNPeriods() + bucketCacheStats
+ .getSumRequestCountsPastNPeriods()));
+ return Double.isNaN(ratio) ? 0 : ratio;
+ }
+
+ @Override
+ public double getHitCachingRatioPastNPeriods() {
+ double ratio = ((double) (lruCacheStats
+ .getSumHitCachingCountsPastNPeriods() + bucketCacheStats
+ .getSumHitCachingCountsPastNPeriods()) / (double) (lruCacheStats
+ .getSumRequestCachingCountsPastNPeriods() + bucketCacheStats
+ .getSumRequestCachingCountsPastNPeriods()));
+ return Double.isNaN(ratio) ? 0 : ratio;
+ }
+
+ }
+
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1432797&r1=1432796&r2=1432797&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Mon Jan 14 04:08:17 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.io.encodi
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.regionserver.MemStore;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
@@ -129,8 +130,9 @@ public class HFileBlock implements Cache
public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
- static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_LONG +
- Bytes.SIZEOF_INT;
+ // minorVersion+offset+nextBlockOnDiskSizeWithHeader
+ public static final int EXTRA_SERIALIZATION_SPACE = 2 * Bytes.SIZEOF_INT
+ + Bytes.SIZEOF_LONG;
/**
* Each checksum value is an integer that can be stored in 4 bytes.
@@ -139,22 +141,39 @@ public class HFileBlock implements Cache
private static final CacheableDeserializer<Cacheable> blockDeserializer =
new CacheableDeserializer<Cacheable>() {
- public HFileBlock deserialize(ByteBuffer buf) throws IOException{
- ByteBuffer newByteBuffer = ByteBuffer.allocate(buf.limit()
- - HFileBlock.EXTRA_SERIALIZATION_SPACE);
- buf.limit(buf.limit()
- - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
- newByteBuffer.put(buf);
- HFileBlock ourBuffer = new HFileBlock(newByteBuffer,
- MINOR_VERSION_NO_CHECKSUM);
-
+ public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws IOException{
+ buf.limit(buf.limit() - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
+ ByteBuffer newByteBuffer;
+ if (reuse) {
+ newByteBuffer = buf.slice();
+ } else {
+ newByteBuffer = ByteBuffer.allocate(buf.limit());
+ newByteBuffer.put(buf);
+ }
buf.position(buf.limit());
buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
+ int minorVersion=buf.getInt();
+ HFileBlock ourBuffer = new HFileBlock(newByteBuffer, minorVersion);
ourBuffer.offset = buf.getLong();
ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
return ourBuffer;
}
+
+ @Override
+ public int getDeserialiserIdentifier() {
+ return deserializerIdentifier;
+ }
+
+ @Override
+ public HFileBlock deserialize(ByteBuffer b) throws IOException {
+ return deserialize(b, false);
+ }
};
+ private static final int deserializerIdentifier;
+ static {
+ deserializerIdentifier = CacheableDeserializerIdManager
+ .registerDeserializer(blockDeserializer);
+ }
private BlockType blockType;
@@ -359,6 +378,17 @@ public class HFileBlock implements Cache
}
/**
+ * Returns the buffer of this block, including header data. The clients must
+ * not modify the buffer object. This method has to be public because it is
+ * used in {@link BucketCache} to avoid buffer copy.
+ *
+ * @return the byte buffer with header included for read-only operations
+ */
+ public ByteBuffer getBufferReadOnlyWithHeader() {
+ return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice();
+ }
+
+ /**
* Returns a byte buffer of this block, including header data, positioned at
* the beginning of header. The underlying data array is not copied.
*
@@ -1780,7 +1810,17 @@ public class HFileBlock implements Cache
@Override
public void serialize(ByteBuffer destination) {
- destination.put(this.buf.duplicate());
+ ByteBuffer dupBuf = this.buf.duplicate();
+ dupBuf.rewind();
+ destination.put(dupBuf);
+ destination.putInt(this.minorVersion);
+ destination.putLong(this.offset);
+ destination.putInt(this.nextBlockOnDiskSizeWithHeader);
+ destination.rewind();
+ }
+
+ public void serializeExtraInfo(ByteBuffer destination) {
+ destination.putInt(this.minorVersion);
destination.putLong(this.offset);
destination.putInt(this.nextBlockOnDiskSizeWithHeader);
destination.rewind();
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1432797&r1=1432796&r2=1432797&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Mon Jan 14 04:08:17 2013
@@ -44,6 +44,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CachedBlock.BlockPriority;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -173,6 +175,9 @@ public class LruBlockCache implements Bl
/** Overhead of the structure itself */
private long overhead;
+ /** Where to send victims (blocks evicted from the cache) */
+ private BucketCache victimHandler = null;
+
/**
* Default constructor. Specify maximum size and expected average block
* size (approximation is fine).
@@ -342,6 +347,8 @@ public class LruBlockCache implements Bl
CachedBlock cb = map.get(cacheKey);
if(cb == null) {
if (!repeat) stats.miss(caching);
+ if (victimHandler != null)
+ return victimHandler.getBlock(cacheKey, caching, repeat);
return null;
}
stats.hit(caching);
@@ -349,12 +356,20 @@ public class LruBlockCache implements Bl
return cb.getBuffer();
}
+ /**
+ * Whether the cache contains block with specified cacheKey
+ * @param cacheKey
+ * @return true if contains the block
+ */
+ public boolean containsBlock(BlockCacheKey cacheKey) {
+ return map.containsKey(cacheKey);
+ }
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
CachedBlock cb = map.get(cacheKey);
if (cb == null) return false;
- evictBlock(cb);
+ evictBlock(cb, false);
return true;
}
@@ -377,14 +392,31 @@ public class LruBlockCache implements Bl
++numEvicted;
}
}
+ if (victimHandler != null) {
+ numEvicted += victimHandler.evictBlocksByHfileName(hfileName);
+ }
return numEvicted;
}
- protected long evictBlock(CachedBlock block) {
+ /**
+ * Evict the block, and it will be cached by the victim handler if exists &&
+ * block may be read again later
+ * @param block
+ * @param evictedByEvictionProcess true if the given block is evicted by
+ * EvictionThread
+ * @return the heap size of evicted block
+ */
+ protected long evictBlock(CachedBlock block, boolean evictedByEvictionProcess) {
map.remove(block.getCacheKey());
updateSizeMetrics(block, true);
elements.decrementAndGet();
stats.evicted();
+ if (evictedByEvictionProcess && victimHandler != null) {
+ boolean wait = getCurrentSize() < acceptableSize();
+ boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
+ victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
+ inMemory, wait);
+ }
return block.heapSize();
}
@@ -512,7 +544,7 @@ public class LruBlockCache implements Bl
CachedBlock cb;
long freedBytes = 0;
while ((cb = queue.pollLast()) != null) {
- freedBytes += evictBlock(cb);
+ freedBytes += evictBlock(cb, true);
if (freedBytes >= toFree) {
return freedBytes;
}
@@ -703,7 +735,7 @@ public class LruBlockCache implements Bl
}
public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
- (3 * Bytes.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) +
+ (3 * Bytes.SIZEOF_LONG) + (9 * ClassSize.REFERENCE) +
(5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
+ ClassSize.OBJECT);
@@ -772,6 +804,8 @@ public class LruBlockCache implements Bl
}
public void shutdown() {
+ if (victimHandler != null)
+ victimHandler.shutdown();
this.scheduleThreadPool.shutdown();
for (int i = 0; i < 10; i++) {
if (!this.scheduleThreadPool.isShutdown()) Threads.sleep(10);
@@ -822,4 +856,9 @@ public class LruBlockCache implements Bl
return counts;
}
+ public void setVictimCache(BucketCache handler) {
+ assert victimHandler == null;
+ victimHandler = handler;
+ }
+
}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java?rev=1432797&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java Mon Jan 14 04:08:17 2013
@@ -0,0 +1,549 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BucketEntry;
+
+/**
+ * This class is used to allocate a block with specified size and free the block
+ * when evicting. It manages an array of buckets, each bucket is associated with
+ * a size and caches elements up to this size. For completely empty bucket, this
+ * size could be re-specified dynamically.
+ *
+ * This class is not thread safe.
+ */
+@InterfaceAudience.Private
+public final class BucketAllocator {
+ static final Log LOG = LogFactory.getLog(BucketAllocator.class);
+
+ final private static class Bucket {
+ private long baseOffset;
+ private int itemAllocationSize, sizeIndex;
+ private int itemCount;
+ private int freeList[];
+ private int freeCount, usedCount;
+
+ public Bucket(long offset) {
+ baseOffset = offset;
+ sizeIndex = -1;
+ }
+
+ void reconfigure(int sizeIndex) {
+ this.sizeIndex = sizeIndex;
+ assert sizeIndex < BUCKET_SIZES.length;
+ itemAllocationSize = BUCKET_SIZES[sizeIndex];
+ itemCount = (int) (((long) BUCKET_CAPACITY) / (long) itemAllocationSize);
+ freeCount = itemCount;
+ usedCount = 0;
+ freeList = new int[itemCount];
+ for (int i = 0; i < freeCount; ++i)
+ freeList[i] = i;
+ }
+
+ public boolean isUninstantiated() {
+ return sizeIndex == -1;
+ }
+
+ public int sizeIndex() {
+ return sizeIndex;
+ }
+
+ public int itemAllocationSize() {
+ return itemAllocationSize;
+ }
+
+ public boolean hasFreeSpace() {
+ return freeCount > 0;
+ }
+
+ public boolean isCompletelyFree() {
+ return usedCount == 0;
+ }
+
+ public int freeCount() {
+ return freeCount;
+ }
+
+ public int usedCount() {
+ return usedCount;
+ }
+
+ public int freeBytes() {
+ return freeCount * itemAllocationSize;
+ }
+
+ public int usedBytes() {
+ return usedCount * itemAllocationSize;
+ }
+
+ public long baseOffset() {
+ return baseOffset;
+ }
+
+ /**
+ * Allocate a block in this bucket, return the offset representing the
+ * position in physical space
+ * @return the offset in the IOEngine
+ */
+ public long allocate() {
+ assert freeCount > 0; // Else should not have been called
+ assert sizeIndex != -1;
+ ++usedCount;
+ long offset = baseOffset + (freeList[--freeCount] * itemAllocationSize);
+ assert offset >= 0;
+ return offset;
+ }
+
+ public void addAllocation(long offset) throws BucketAllocatorException {
+ offset -= baseOffset;
+ if (offset < 0 || offset % itemAllocationSize != 0)
+ throw new BucketAllocatorException(
+ "Attempt to add allocation for bad offset: " + offset + " base="
+ + baseOffset + ", bucket size=" + itemAllocationSize);
+ int idx = (int) (offset / itemAllocationSize);
+ boolean matchFound = false;
+ for (int i = 0; i < freeCount; ++i) {
+ if (matchFound) freeList[i - 1] = freeList[i];
+ else if (freeList[i] == idx) matchFound = true;
+ }
+ if (!matchFound)
+ throw new BucketAllocatorException("Couldn't find match for index "
+ + idx + " in free list");
+ ++usedCount;
+ --freeCount;
+ }
+
+ private void free(long offset) {
+ offset -= baseOffset;
+ assert offset >= 0;
+ assert offset < itemCount * itemAllocationSize;
+ assert offset % itemAllocationSize == 0;
+ assert usedCount > 0;
+ assert freeCount < itemCount; // Else duplicate free
+ int item = (int) (offset / (long) itemAllocationSize);
+ assert !freeListContains(item);
+ --usedCount;
+ freeList[freeCount++] = item;
+ }
+
+ private boolean freeListContains(int blockNo) {
+ for (int i = 0; i < freeCount; ++i) {
+ if (freeList[i] == blockNo) return true;
+ }
+ return false;
+ }
+ }
+
+ final class BucketSizeInfo {
+ // Free bucket means it has space to allocate a block;
+ // Completely free bucket means it has no block.
+ private List<Bucket> bucketList, freeBuckets, completelyFreeBuckets;
+ private int sizeIndex;
+
+ BucketSizeInfo(int sizeIndex) {
+ bucketList = new ArrayList<Bucket>();
+ freeBuckets = new ArrayList<Bucket>();
+ completelyFreeBuckets = new ArrayList<Bucket>();
+ this.sizeIndex = sizeIndex;
+ }
+
+ public void instantiateBucket(Bucket b) {
+ assert b.isUninstantiated() || b.isCompletelyFree();
+ b.reconfigure(sizeIndex);
+ bucketList.add(b);
+ freeBuckets.add(b);
+ completelyFreeBuckets.add(b);
+ }
+
+ public int sizeIndex() {
+ return sizeIndex;
+ }
+
+ /**
+ * Find a bucket to allocate a block
+ * @return the offset in the IOEngine
+ */
+ public long allocateBlock() {
+ Bucket b = null;
+ if (freeBuckets.size() > 0) // Use up an existing one first...
+ b = freeBuckets.get(freeBuckets.size() - 1);
+ if (b == null) {
+ b = grabGlobalCompletelyFreeBucket();
+ if (b != null) instantiateBucket(b);
+ }
+ if (b == null) return -1;
+ long result = b.allocate();
+ blockAllocated(b);
+ return result;
+ }
+
+ void blockAllocated(Bucket b) {
+ if (!b.isCompletelyFree()) completelyFreeBuckets.remove(b);
+ if (!b.hasFreeSpace()) freeBuckets.remove(b);
+ }
+
+ public Bucket findAndRemoveCompletelyFreeBucket() {
+ Bucket b = null;
+ assert bucketList.size() > 0;
+ if (bucketList.size() == 1) {
+ // So we never get complete starvation of a bucket for a size
+ return null;
+ }
+
+ if (completelyFreeBuckets.size() > 0) {
+ b = completelyFreeBuckets.get(0);
+ removeBucket(b);
+ }
+ return b;
+ }
+
+ private void removeBucket(Bucket b) {
+ assert b.isCompletelyFree();
+ bucketList.remove(b);
+ freeBuckets.remove(b);
+ completelyFreeBuckets.remove(b);
+ }
+
+ public void freeBlock(Bucket b, long offset) {
+ assert bucketList.contains(b);
+ // else we shouldn't have anything to free...
+ assert (!completelyFreeBuckets.contains(b));
+ b.free(offset);
+ if (!freeBuckets.contains(b)) freeBuckets.add(b);
+ if (b.isCompletelyFree()) completelyFreeBuckets.add(b);
+ }
+
+ public IndexStatistics statistics() {
+ long free = 0, used = 0;
+ for (Bucket b : bucketList) {
+ free += b.freeCount();
+ used += b.usedCount();
+ }
+ return new IndexStatistics(free, used, BUCKET_SIZES[sizeIndex]);
+ }
+ }
+
+ // Default block size is 64K, so we choose more sizes near 64K, you'd better
+ // reset it according to your cluster's block size distribution
+ // TODO Make these sizes configurable
+ // TODO Support the view of block size distribution statistics
+ private static final int BUCKET_SIZES[] = { 4 * 1024 + 1024, 8 * 1024 + 1024,
+ 16 * 1024 + 1024, 32 * 1024 + 1024, 40 * 1024 + 1024, 48 * 1024 + 1024,
+ 56 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 128 * 1024 + 1024,
+ 192 * 1024 + 1024, 256 * 1024 + 1024, 384 * 1024 + 1024,
+ 512 * 1024 + 1024 };
+
+ /**
+ * Round up the given block size to bucket size, and get the corresponding
+ * BucketSizeInfo
+ * @param blockSize
+ * @return BucketSizeInfo
+ */
+ public BucketSizeInfo roundUpToBucketSizeInfo(int blockSize) {
+ for (int i = 0; i < BUCKET_SIZES.length; ++i)
+ if (blockSize <= BUCKET_SIZES[i])
+ return bucketSizeInfos[i];
+ return null;
+ }
+
+ static final int BIG_ITEM_SIZE = (512 * 1024) + 1024; // 513K plus overhead
+ static public final int FEWEST_ITEMS_IN_BUCKET = 4;
+ // The capacity size for each bucket
+ static final long BUCKET_CAPACITY = FEWEST_ITEMS_IN_BUCKET * BIG_ITEM_SIZE;
+
+ private Bucket[] buckets;
+ private BucketSizeInfo[] bucketSizeInfos;
+ private final long totalSize;
+ private long usedSize = 0;
+
+ BucketAllocator(long availableSpace) throws BucketAllocatorException {
+ buckets = new Bucket[(int) (availableSpace / (long) BUCKET_CAPACITY)];
+ if (buckets.length < BUCKET_SIZES.length)
+ throw new BucketAllocatorException(
+ "Bucket allocator size too small - must have room for at least "
+ + BUCKET_SIZES.length + " buckets");
+ bucketSizeInfos = new BucketSizeInfo[BUCKET_SIZES.length];
+ for (int i = 0; i < BUCKET_SIZES.length; ++i) {
+ bucketSizeInfos[i] = new BucketSizeInfo(i);
+ }
+ for (int i = 0; i < buckets.length; ++i) {
+ buckets[i] = new Bucket(BUCKET_CAPACITY * i);
+ bucketSizeInfos[i < BUCKET_SIZES.length ? i : BUCKET_SIZES.length - 1]
+ .instantiateBucket(buckets[i]);
+ }
+ this.totalSize = ((long) buckets.length) * BUCKET_CAPACITY;
+ }
+
+ /**
+ * Rebuild the allocator's data structures from a persisted map.
+ * @param availableSpace capacity of cache
+ * @param map A map stores the block key and BucketEntry(block's meta data
+ * like offset, length)
+ * @param realCacheSize cached data size statistics for bucket cache
+ * @throws BucketAllocatorException
+ */
+ BucketAllocator(long availableSpace, Map<BlockCacheKey, BucketEntry> map,
+ AtomicLong realCacheSize) throws BucketAllocatorException {
+ this(availableSpace);
+
+ // each bucket has an offset, sizeindex. probably the buckets are too big
+ // in our default state. so what we do is reconfigure them according to what
+ // we've found. we can only reconfigure each bucket once; if more than once,
+ // we know there's a bug, so we just log the info, throw, and start again...
+ boolean[] reconfigured = new boolean[buckets.length];
+ for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) {
+ long foundOffset = entry.getValue().offset();
+ int foundLen = entry.getValue().getLength();
+ int bucketSizeIndex = -1;
+ for (int i = 0; i < BUCKET_SIZES.length; ++i) {
+ if (foundLen <= BUCKET_SIZES[i]) {
+ bucketSizeIndex = i;
+ break;
+ }
+ }
+ if (bucketSizeIndex == -1) {
+ throw new BucketAllocatorException(
+ "Can't match bucket size for the block with size " + foundLen);
+ }
+ int bucketNo = (int) (foundOffset / (long) BUCKET_CAPACITY);
+ if (bucketNo < 0 || bucketNo >= buckets.length)
+ throw new BucketAllocatorException("Can't find bucket " + bucketNo
+ + ", total buckets=" + buckets.length
+ + "; did you shrink the cache?");
+ Bucket b = buckets[bucketNo];
+ if (reconfigured[bucketNo] == true) {
+ if (b.sizeIndex() != bucketSizeIndex)
+ throw new BucketAllocatorException(
+ "Inconsistent allocation in bucket map;");
+ } else {
+ if (!b.isCompletelyFree())
+ throw new BucketAllocatorException("Reconfiguring bucket "
+ + bucketNo + " but it's already allocated; corrupt data");
+ // Need to remove the bucket from whichever list it's currently in at
+ // the moment...
+ BucketSizeInfo bsi = bucketSizeInfos[bucketSizeIndex];
+ BucketSizeInfo oldbsi = bucketSizeInfos[b.sizeIndex()];
+ oldbsi.removeBucket(b);
+ bsi.instantiateBucket(b);
+ reconfigured[bucketNo] = true;
+ }
+ realCacheSize.addAndGet(foundLen);
+ buckets[bucketNo].addAllocation(foundOffset);
+ usedSize += buckets[bucketNo].itemAllocationSize();
+ bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
+ }
+ }
+
+ public String getInfo() {
+ StringBuilder sb = new StringBuilder(1024);
+ for (int i = 0; i < buckets.length; ++i) {
+ Bucket b = buckets[i];
+ sb.append(" Bucket ").append(i).append(": ").append(b.itemAllocationSize());
+ sb.append(" freeCount=").append(b.freeCount()).append(" used=")
+ .append(b.usedCount());
+ sb.append('\n');
+ }
+ return sb.toString();
+ }
+
+ public long getUsedSize() {
+ return this.usedSize;
+ }
+
+ public long getFreeSize() {
+ long freeSize = this.totalSize - getUsedSize();
+ return freeSize;
+ }
+
+ public long getTotalSize() {
+ return this.totalSize;
+ }
+
+ /**
+ * Allocate a block with specified size. Return the offset
+ * @param blockSize size of block
+ * @throws BucketAllocatorException,CacheFullException
+ * @return the offset in the IOEngine
+ */
+ public synchronized long allocateBlock(int blockSize) throws CacheFullException,
+ BucketAllocatorException {
+ assert blockSize > 0;
+ BucketSizeInfo bsi = roundUpToBucketSizeInfo(blockSize);
+ if (bsi == null) {
+ throw new BucketAllocatorException("Allocation too big size=" + blockSize);
+ }
+ long offset = bsi.allocateBlock();
+
+ // Ask caller to free up space and try again!
+ if (offset < 0)
+ throw new CacheFullException(blockSize, bsi.sizeIndex());
+ usedSize += BUCKET_SIZES[bsi.sizeIndex()];
+ return offset;
+ }
+
+ private Bucket grabGlobalCompletelyFreeBucket() {
+ for (BucketSizeInfo bsi : bucketSizeInfos) {
+ Bucket b = bsi.findAndRemoveCompletelyFreeBucket();
+ if (b != null) return b;
+ }
+ return null;
+ }
+
+ /**
+ * Free a block with the offset
+ * @param offset block's offset
+ * @return size freed
+ */
+ public synchronized int freeBlock(long offset) {
+ int bucketNo = (int) (offset / (long) BUCKET_CAPACITY);
+ assert bucketNo >= 0 && bucketNo < buckets.length;
+ Bucket targetBucket = buckets[bucketNo];
+ bucketSizeInfos[targetBucket.sizeIndex()].freeBlock(targetBucket, offset);
+ usedSize -= targetBucket.itemAllocationSize();
+ return targetBucket.itemAllocationSize();
+ }
+
+ public int sizeIndexOfAllocation(long offset) {
+ int bucketNo = (int) (offset / (long) BUCKET_CAPACITY);
+ assert bucketNo >= 0 && bucketNo < buckets.length;
+ Bucket targetBucket = buckets[bucketNo];
+ return targetBucket.sizeIndex();
+ }
+
+ public int sizeOfAllocation(long offset) {
+ int bucketNo = (int) (offset / (long) BUCKET_CAPACITY);
+ assert bucketNo >= 0 && bucketNo < buckets.length;
+ Bucket targetBucket = buckets[bucketNo];
+ return targetBucket.itemAllocationSize();
+ }
+
+ static public int getMaximumAllocationIndex() {
+ return BUCKET_SIZES.length;
+ }
+
+ static class IndexStatistics {
+ private long freeCount, usedCount, itemSize, totalCount;
+
+ public long freeCount() {
+ return freeCount;
+ }
+
+ public long usedCount() {
+ return usedCount;
+ }
+
+ public long totalCount() {
+ return totalCount;
+ }
+
+ public long freeBytes() {
+ return freeCount * itemSize;
+ }
+
+ public long usedBytes() {
+ return usedCount * itemSize;
+ }
+
+ public long totalBytes() {
+ return totalCount * itemSize;
+ }
+
+ public long itemSize() {
+ return itemSize;
+ }
+
+ public IndexStatistics(long free, long used, long itemSize) {
+ setTo(free, used, itemSize);
+ }
+
+ public IndexStatistics() {
+ setTo(-1, -1, 0);
+ }
+
+ public void setTo(long free, long used, long itemSize) {
+ this.itemSize = itemSize;
+ this.freeCount = free;
+ this.usedCount = used;
+ this.totalCount = free + used;
+ }
+ }
+
+ public void dumpToLog() {
+ logStatistics();
+ StringBuilder sb = new StringBuilder();
+ for (Bucket b : buckets) {
+ sb.append("Bucket:").append(b.baseOffset).append('\n');
+ sb.append(" Size index: " + b.sizeIndex() + "; Free:" + b.freeCount
+ + "; used:" + b.usedCount + "; freelist\n");
+ for (int i = 0; i < b.freeCount(); ++i)
+ sb.append(b.freeList[i]).append(',');
+ sb.append('\n');
+ }
+ LOG.info(sb);
+ }
+
+ public void logStatistics() {
+ IndexStatistics total = new IndexStatistics();
+ IndexStatistics[] stats = getIndexStatistics(total);
+ LOG.info("Bucket allocator statistics follow:\n");
+ LOG.info(" Free bytes=" + total.freeBytes() + "+; used bytes="
+ + total.usedBytes() + "; total bytes=" + total.totalBytes());
+ for (IndexStatistics s : stats) {
+ LOG.info(" Object size " + s.itemSize() + " used=" + s.usedCount()
+ + "; free=" + s.freeCount() + "; total=" + s.totalCount());
+ }
+ }
+
+ public IndexStatistics[] getIndexStatistics(IndexStatistics grandTotal) {
+ IndexStatistics[] stats = getIndexStatistics();
+ long totalfree = 0, totalused = 0;
+ for (IndexStatistics stat : stats) {
+ totalfree += stat.freeBytes();
+ totalused += stat.usedBytes();
+ }
+ grandTotal.setTo(totalfree, totalused, 1);
+ return stats;
+ }
+
+ public IndexStatistics[] getIndexStatistics() {
+ IndexStatistics[] stats = new IndexStatistics[BUCKET_SIZES.length];
+ for (int i = 0; i < stats.length; ++i)
+ stats[i] = bucketSizeInfos[i].statistics();
+ return stats;
+ }
+
+ public long freeBlock(long freeList[]) {
+ long sz = 0;
+ for (int i = 0; i < freeList.length; ++i)
+ sz += freeBlock(freeList[i]);
+ return sz;
+ }
+
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java?rev=1432797&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocatorException.java Mon Jan 14 04:08:17 2013
@@ -0,0 +1,35 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Thrown by {@link BucketAllocator}
+ */
+@InterfaceAudience.Private
+public class BucketAllocatorException extends IOException {
+ private static final long serialVersionUID = 2479119906660788096L;
+
+ BucketAllocatorException(String reason) {
+ super(reason);
+ }
+}
\ No newline at end of file