You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/08/26 20:53:01 UTC
svn commit: r1162207 [1/2] - in /hbase/trunk: ./ conf/
src/main/java/org/apache/hadoop/hbase/io/hfile/
src/main/java/org/apache/hadoop/hbase/io/hfile/slab/
src/main/java/org/apache/hadoop/hbase/regionserver/
src/main/java/org/apache/hadoop/hbase/util/ ...
Author: stack
Date: Fri Aug 26 18:53:00 2011
New Revision: 1162207
URL: http://svn.apache.org/viewvc?rev=1162207&view=rev
Log:
HBASE-4027 Enable direct byte buffers LruBlockCache
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/Slab.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabItemEvictionWatcher.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/
hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSingleSizeCache.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlab.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/slab/TestSlabCache.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/conf/hbase-env.sh
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1162207&r1=1162206&r2=1162207&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Aug 26 18:53:00 2011
@@ -463,6 +463,7 @@ Release 0.91.0 - Unreleased
HBASE-4071 Data GC: Remove all versions > TTL EXCEPT the last
written version (Lars Hofhansl)
HBASE-4242 Add documentation for HBASE-4071 (Lars Hofhansl)
+ HBASE-4027 Enable direct byte buffers LruBlockCache (Li Pi)
Release 0.90.5 - Unreleased
Modified: hbase/trunk/conf/hbase-env.sh
URL: http://svn.apache.org/viewvc/hbase/trunk/conf/hbase-env.sh?rev=1162207&r1=1162206&r2=1162207&view=diff
==============================================================================
--- hbase/trunk/conf/hbase-env.sh (original)
+++ hbase/trunk/conf/hbase-env.sh Fri Aug 26 18:53:00 2011
@@ -39,6 +39,11 @@ export HBASE_OPTS="-ea -XX:+UseConcMarkS
# Uncomment below to enable java garbage collection logging in the .out file.
# export HBASE_OPTS="$HBASE_OPTS -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps"
+# Uncomment below if you intend to use the EXPERIMENTAL off heap cache.
+# export HBASE_OPTS="$HBASE_OPTS -XX:MaxDirectMemorySize="
+# Set hbase.offheapcachesize in hbase-site.xml
+
+
# Uncomment and adjust to enable JMX exporting
# See jmxremote.password and jmxremote.access in $JRE_HOME/lib/management to configure remote password access.
# More details at: http://java.sun.com/javase/6/docs/technotes/guides/management/agent.html
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java?rev=1162207&r1=1162206&r2=1162207&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java Fri Aug 26 18:53:00 2011
@@ -23,14 +23,10 @@ import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
/**
- * Block cache interface. Anything that implements the {@link HeapSize}
- * interface can be put in the cache, because item size is all the cache
- * cares about. We might move to a more specialized "cacheable" interface
- * in the future.
+ * Block cache interface. Anything that implements the {@link Cacheable}
+ * interface can be put in the cache.
*
* TODO: Add filename or hash of filename to block cache key.
*/
@@ -41,22 +37,22 @@ public interface BlockCache {
* @param buf The block contents wrapped in a ByteBuffer.
* @param inMemory Whether block should be treated as in-memory
*/
- public void cacheBlock(String blockName, HeapSize buf, boolean inMemory);
+ public void cacheBlock(String blockName, Cacheable buf, boolean inMemory);
/**
* Add block to cache (defaults to not in-memory).
* @param blockName Zero-based file block number.
- * @param buf The block contents wrapped in a ByteBuffer.
+ * @param buf The object to cache.
*/
- public void cacheBlock(String blockName, HeapSize buf);
+ public void cacheBlock(String blockName, Cacheable buf);
/**
* Fetch block from cache.
* @param blockName Block number to fetch.
* @param caching Whether this request has caching enabled (used for stats)
- * @return Block or null if block is not in the cache.
+ * @return Block or null if block is not in 2 cache.
*/
- public HeapSize getBlock(String blockName, boolean caching);
+ public Cacheable getBlock(String blockName, boolean caching);
/**
* Evict block from cache.
@@ -94,15 +90,15 @@ public interface BlockCache {
public long getCurrentSize();
public long getEvictedCount();
-
+
/**
* Performs a BlockCache summary and returns a List of BlockCacheColumnFamilySummary objects.
* This method could be fairly heavyweight in that it evaluates the entire HBase file-system
- * against what is in the RegionServer BlockCache.
+ * against what is in the RegionServer BlockCache.
* <br><br>
* The contract of this interface is to return the List in sorted order by Table name, then
* ColumnFamily.
- *
+ *
* @param conf HBaseConfiguration
* @return List of BlockCacheColumnFamilySummary
* @throws IOException exception
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java?rev=1162207&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheStats.java Fri Aug 26 18:53:00 2011
@@ -0,0 +1,118 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Class that implements cache metrics.
+ */
+public class CacheStats {
+ /** The number of getBlock requests that were cache hits */
+ private final AtomicLong hitCount = new AtomicLong(0);
+ /**
+ * The number of getBlock requests that were cache hits, but only from
+ * requests that were set to use the block cache. This is because all reads
+ * attempt to read from the block cache even if they will not put new blocks
+ * into the block cache. See HBASE-2253 for more information.
+ */
+ private final AtomicLong hitCachingCount = new AtomicLong(0);
+ /** The number of getBlock requests that were cache misses */
+ private final AtomicLong missCount = new AtomicLong(0);
+ /**
+ * The number of getBlock requests that were cache misses, but only from
+ * requests that were set to use the block cache.
+ */
+ private final AtomicLong missCachingCount = new AtomicLong(0);
+ /** The number of times an eviction has occurred */
+ private final AtomicLong evictionCount = new AtomicLong(0);
+ /** The total number of blocks that have been evicted */
+ private final AtomicLong evictedBlockCount = new AtomicLong(0);
+
+ public void miss(boolean caching) {
+ missCount.incrementAndGet();
+ if (caching) missCachingCount.incrementAndGet();
+ }
+
+ public void hit(boolean caching) {
+ hitCount.incrementAndGet();
+ if (caching) hitCachingCount.incrementAndGet();
+ }
+
+ public void evict() {
+ evictionCount.incrementAndGet();
+ }
+
+ public void evicted() {
+ evictedBlockCount.incrementAndGet();
+ }
+
+ public long getRequestCount() {
+ return getHitCount() + getMissCount();
+ }
+
+ public long getRequestCachingCount() {
+ return getHitCachingCount() + getMissCachingCount();
+ }
+
+ public long getMissCount() {
+ return missCount.get();
+ }
+
+ public long getMissCachingCount() {
+ return missCachingCount.get();
+ }
+
+ public long getHitCount() {
+ return hitCount.get();
+ }
+
+ public long getHitCachingCount() {
+ return hitCachingCount.get();
+ }
+
+ public long getEvictionCount() {
+ return evictionCount.get();
+ }
+
+ public long getEvictedCount() {
+ return evictedBlockCount.get();
+ }
+
+ public double getHitRatio() {
+ return ((float)getHitCount()/(float)getRequestCount());
+ }
+
+ public double getHitCachingRatio() {
+ return ((float)getHitCachingCount()/(float)getRequestCachingCount());
+ }
+
+ public double getMissRatio() {
+ return ((float)getMissCount()/(float)getRequestCount());
+ }
+
+ public double getMissCachingRatio() {
+ return ((float)getMissCachingCount()/(float)getRequestCachingCount());
+ }
+
+ public double evictedPerEviction() {
+ return ((float)getEvictedCount()/(float)getEvictionCount());
+ }
+}
\ No newline at end of file
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java?rev=1162207&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java Fri Aug 26 18:53:00 2011
@@ -0,0 +1,56 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.nio.ByteBuffer;
+import org.apache.hadoop.hbase.io.HeapSize;
+
+/**
+ * Cacheable is an interface that allows for an object to be cached. If using an
+ * on heap cache, just use heapsize. If using an off heap cache, Cacheable
+ * provides methods for serialization of the object.
+ *
+ * Some objects cannot be moved off heap, those objects will return a
+ * getSerializedLength() of 0.
+ *
+ */
+public interface Cacheable extends HeapSize {
+ /**
+ * Returns the length of the ByteBuffer required to serialized the object. If the
+ * object cannot be serialized, it should also return 0.
+ *
+ * @return int length in bytes of the serialized form.
+ */
+
+ public int getSerializedLength();
+
+ /**
+ * Serializes its data into destination.
+ */
+ public void serialize(ByteBuffer destination);
+
+ /**
+ * Returns CacheableDeserializer instance which reconstructs original object from ByteBuffer.
+ *
+ * @return CacheableDeserialzer instance.
+ */
+ public CacheableDeserializer<Cacheable> getDeserializer();
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java?rev=1162207&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializer.java Fri Aug 26 18:53:00 2011
@@ -0,0 +1,17 @@
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Interface for a deserializer. Throws an IOException if the serialized data is
+ * incomplete or wrong.
+ * */
+public interface CacheableDeserializer<T extends Cacheable> {
+ /**
+ * Returns the deserialized object.
+ *
+ * @return T the deserialized object.
+ */
+ public T deserialize(ByteBuffer b) throws IOException;
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java?rev=1162207&r1=1162206&r2=1162207&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/CachedBlock.java Fri Aug 26 18:53:00 2011
@@ -19,8 +19,6 @@
*/
package org.apache.hadoop.hbase.io.hfile;
-import java.nio.ByteBuffer;
-
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -55,16 +53,16 @@ public class CachedBlock implements Heap
};
private final String blockName;
- private final HeapSize buf;
+ private final Cacheable buf;
private volatile long accessTime;
private long size;
private BlockPriority priority;
- public CachedBlock(String blockName, HeapSize buf, long accessTime) {
+ public CachedBlock(String blockName, Cacheable buf, long accessTime) {
this(blockName, buf, accessTime, false);
}
- public CachedBlock(String blockName, HeapSize buf, long accessTime,
+ public CachedBlock(String blockName, Cacheable buf, long accessTime,
boolean inMemory) {
this.blockName = blockName;
this.buf = buf;
@@ -97,7 +95,7 @@ public class CachedBlock implements Heap
return this.accessTime < that.accessTime ? 1 : -1;
}
- public HeapSize getBuffer() {
+ public Cacheable getBuffer() {
return this.buf;
}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java?rev=1162207&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/DoubleBlockCache.java Fri Aug 26 18:53:00 2011
@@ -0,0 +1,168 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.hfile.slab.SlabCache;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * DoubleBlockCache is an abstraction layer that combines two caches, the
+ * smaller onHeapCache and the larger offHeapCache. CacheBlock attempts to cache
+ * the block in both caches, while readblock reads first from the faster on heap
+ * cache before looking for the block in the off heap cache. Metrics are the
+ * combined size and hits and misses of both caches.
+ *
+ **/
+public class DoubleBlockCache implements BlockCache, HeapSize {
+
+ static final Log LOG = LogFactory.getLog(DoubleBlockCache.class.getName());
+
+ private final LruBlockCache onHeapCache;
+ private final SlabCache offHeapCache;
+ private final CacheStats stats;
+
+
+ /**
+ * Default constructor. Specify maximum size and expected average block size
+ * (approximation is fine).
+ * <p>
+ * All other factors will be calculated based on defaults specified in this
+ * class.
+ *
+ * @param maxSize
+ * maximum size of cache, in bytes
+ * @param blockSize
+ * approximate size of each block, in bytes
+ */
+ public DoubleBlockCache(long onHeapSize, long offHeapSize, long blockSizeLru,
+ long blockSizeSlab) {
+
+ LOG.info("Creating on-heap cache of size "
+ + StringUtils.humanReadableInt(onHeapSize)
+ + "bytes with an average block size of "
+ + StringUtils.humanReadableInt(blockSizeLru) + " bytes.");
+ onHeapCache = new LruBlockCache(onHeapSize, blockSizeLru);
+
+ LOG.info("Creating off-heap cache of size "
+ + StringUtils.humanReadableInt(offHeapSize)
+ + "bytes with an average block size of "
+ + StringUtils.humanReadableInt(blockSizeSlab) + " bytes.");
+ offHeapCache = new SlabCache(offHeapSize, blockSizeSlab);
+
+ this.stats = new CacheStats();
+ }
+
+ @Override
+ public void cacheBlock(String blockName, Cacheable buf, boolean inMemory) {
+ onHeapCache.cacheBlock(blockName, buf, inMemory);
+ offHeapCache.cacheBlock(blockName, buf);
+ }
+
+ @Override
+ public void cacheBlock(String blockName, Cacheable buf) {
+ onHeapCache.cacheBlock(blockName, buf);
+ offHeapCache.cacheBlock(blockName, buf);
+ }
+
+ @Override
+ public Cacheable getBlock(String blockName, boolean caching) {
+ Cacheable cachedBlock;
+
+ if ((cachedBlock = onHeapCache.getBlock(blockName, caching)) != null) {
+ stats.hit(caching);
+ return cachedBlock;
+
+ } else if ((cachedBlock = offHeapCache.getBlock(blockName, caching)) != null) {
+ if (caching) {
+ onHeapCache.cacheBlock(blockName, cachedBlock);
+ }
+ stats.hit(caching);
+ return cachedBlock;
+ }
+
+ stats.miss(caching);
+ return null;
+ }
+
+ @Override
+ public boolean evictBlock(String blockName) {
+ stats.evict();
+ boolean cacheA = onHeapCache.evictBlock(blockName);
+ boolean cacheB = offHeapCache.evictBlock(blockName);
+ boolean evicted = cacheA || cacheB;
+ if (evicted) {
+ stats.evicted();
+ }
+ return evicted;
+ }
+
+ @Override
+ public CacheStats getStats() {
+ return this.stats;
+ }
+
+ @Override
+ public void shutdown() {
+ onHeapCache.shutdown();
+ offHeapCache.shutdown();
+ }
+
+ @Override
+ public long heapSize() {
+ return onHeapCache.heapSize() + offHeapCache.heapSize();
+ }
+
+ public long size() {
+ return onHeapCache.size() + offHeapCache.size();
+ }
+
+ public long getFreeSize() {
+ return onHeapCache.getFreeSize() + offHeapCache.getFreeSize();
+ }
+
+ public long getCurrentSize() {
+ return onHeapCache.getCurrentSize() + offHeapCache.getCurrentSize();
+ }
+
+ public long getEvictedCount() {
+ return onHeapCache.getEvictedCount() + offHeapCache.getEvictedCount();
+ }
+
+ @Override
+ public int evictBlocksByPrefix(String prefix) {
+ onHeapCache.evictBlocksByPrefix(prefix);
+ offHeapCache.evictBlocksByPrefix(prefix);
+ return 0;
+ }
+
+ @Override
+ public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(
+ Configuration conf) throws IOException {
+ return onHeapCache.getBlockCacheColumnFamilySummaries(conf);
+ }
+
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1162207&r1=1162206&r2=1162207&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Fri Aug 26 18:53:00 2011
@@ -29,12 +29,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hbase.io.DoubleOutputStream;
-import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -75,7 +73,7 @@ import static org.apache.hadoop.hbase.io
* The version 2 block representation in the block cache is the same as above,
* except that the data section is always uncompressed in the cache.
*/
-public class HFileBlock implements HeapSize {
+public class HFileBlock implements Cacheable {
/** The size of a version 2 {@link HFile} block header */
public static final int HEADER_SIZE = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT
@@ -87,6 +85,27 @@ public class HFileBlock implements HeapS
public static final int BYTE_BUFFER_HEAP_SIZE = (int) ClassSize.estimateBase(
ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
+ static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT;
+
+
+ private static final CacheableDeserializer<Cacheable> blockDeserializer =
+ new CacheableDeserializer<Cacheable>() {
+ public HFileBlock deserialize(ByteBuffer buf) throws IOException{
+ ByteBuffer tempCopy = buf.duplicate();
+ ByteBuffer newByteBuffer = ByteBuffer.allocate(tempCopy.limit()
+ - HFileBlock.EXTRA_SERIALIZATION_SPACE);
+ tempCopy.limit(tempCopy.limit()
+ - HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
+ newByteBuffer.put(tempCopy);
+ HFileBlock ourBuffer = new HFileBlock(newByteBuffer);
+
+ tempCopy.position(tempCopy.limit());
+ tempCopy.limit(tempCopy.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
+ ourBuffer.offset = tempCopy.getLong();
+ ourBuffer.nextBlockOnDiskSizeWithHeader = tempCopy.getInt();
+ return ourBuffer;
+ }
+ };
private BlockType blockType;
private final int onDiskSizeWithoutHeader;
private final int uncompressedSizeWithoutHeader;
@@ -398,9 +417,20 @@ public class HFileBlock implements HeapS
// uncompressed size, next block's on-disk size, offset and previous
// offset, byte buffer object, and its byte array. Might also need to add
// some fields inside the byte buffer.
- return ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3
- * Bytes.SIZEOF_INT + 2 * Bytes.SIZEOF_LONG + BYTE_BUFFER_HEAP_SIZE) +
- ClassSize.align(buf.capacity());
+
+ // We only add one BYTE_BUFFER_HEAP_SIZE because at any given moment, one of
+ // the bytebuffers will be null. But we do account for both references.
+
+ // If we are on heap, then we add the capacity of buf.
+ if (buf != null) {
+ return ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3
+ * Bytes.SIZEOF_INT + 2 * Bytes.SIZEOF_LONG + BYTE_BUFFER_HEAP_SIZE)
+ + ClassSize.align(buf.capacity());
+ } else {
+
+ return ClassSize.align(ClassSize.OBJECT + 2 * ClassSize.REFERENCE + 3
+ * Bytes.SIZEOF_INT + 2 * Bytes.SIZEOF_LONG + BYTE_BUFFER_HEAP_SIZE);
+ }
}
/**
@@ -1438,4 +1468,70 @@ public class HFileBlock implements HeapS
}
-}
+ @Override
+ public int getSerializedLength() {
+ if (buf != null) {
+ return this.buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
+ }
+ return 0;
+ }
+
+ @Override
+ public void serialize(ByteBuffer destination) {
+ destination.put(this.buf.duplicate());
+ destination.putLong(this.offset);
+ destination.putInt(this.nextBlockOnDiskSizeWithHeader);
+ destination.rewind();
+ }
+
+ @Override
+ public CacheableDeserializer<Cacheable> getDeserializer() {
+ return HFileBlock.blockDeserializer;
+ }
+
+ @Override
+ public boolean equals(Object comparison) {
+ if (this == comparison) {
+ return true;
+ }
+ if (comparison == null) {
+ return false;
+ }
+ if (comparison.getClass() != this.getClass()) {
+ return false;
+ }
+
+ HFileBlock castedComparison = (HFileBlock) comparison;
+
+ if (castedComparison.blockType != this.blockType) {
+ return false;
+ }
+ if (castedComparison.nextBlockOnDiskSizeWithHeader != this.nextBlockOnDiskSizeWithHeader) {
+ return false;
+ }
+ if (castedComparison.offset != this.offset) {
+ return false;
+ }
+ if (castedComparison.onDiskSizeWithoutHeader != this.onDiskSizeWithoutHeader) {
+ return false;
+ }
+ if (castedComparison.prevBlockOffset != this.prevBlockOffset) {
+ return false;
+ }
+ if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) {
+ return false;
+ }
+ if (this.buf.compareTo(castedComparison.buf) != 0) {
+ return false;
+ }
+ if (this.buf.position() != castedComparison.buf.position()){
+ return false;
+ }
+ if (this.buf.limit() != castedComparison.buf.limit()){
+ return false;
+ }
+ return true;
+ }
+
+
+}
\ No newline at end of file
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1162207&r1=1162206&r2=1162207&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Fri Aug 26 18:53:00 2011
@@ -261,7 +261,7 @@ public class LruBlockCache implements Bl
* @param buf block buffer
* @param inMemory if block is in-memory
*/
- public void cacheBlock(String blockName, HeapSize buf, boolean inMemory) {
+ public void cacheBlock(String blockName, Cacheable buf, boolean inMemory) {
CachedBlock cb = map.get(blockName);
if(cb != null) {
throw new RuntimeException("Cached an already cached block");
@@ -285,7 +285,7 @@ public class LruBlockCache implements Bl
* @param blockName block name
* @param buf block buffer
*/
- public void cacheBlock(String blockName, HeapSize buf) {
+ public void cacheBlock(String blockName, Cacheable buf) {
cacheBlock(blockName, buf, false);
}
@@ -294,7 +294,7 @@ public class LruBlockCache implements Bl
* @param blockName block name
* @return buffer of specified block name, or null if not in cache
*/
- public HeapSize getBlock(String blockName, boolean caching) {
+ public Cacheable getBlock(String blockName, boolean caching) {
CachedBlock cb = map.get(blockName);
if(cb == null) {
stats.miss(caching);
@@ -624,100 +624,7 @@ public class LruBlockCache implements Bl
public CacheStats getStats() {
return this.stats;
}
-
- public static class CacheStats {
- /** The number of getBlock requests that were cache hits */
- private final AtomicLong hitCount = new AtomicLong(0);
- /**
- * The number of getBlock requests that were cache hits, but only from
- * requests that were set to use the block cache. This is because all reads
- * attempt to read from the block cache even if they will not put new blocks
- * into the block cache. See HBASE-2253 for more information.
- */
- private final AtomicLong hitCachingCount = new AtomicLong(0);
- /** The number of getBlock requests that were cache misses */
- private final AtomicLong missCount = new AtomicLong(0);
- /**
- * The number of getBlock requests that were cache misses, but only from
- * requests that were set to use the block cache.
- */
- private final AtomicLong missCachingCount = new AtomicLong(0);
- /** The number of times an eviction has occurred */
- private final AtomicLong evictionCount = new AtomicLong(0);
- /** The total number of blocks that have been evicted */
- private final AtomicLong evictedCount = new AtomicLong(0);
-
- public void miss(boolean caching) {
- missCount.incrementAndGet();
- if (caching) missCachingCount.incrementAndGet();
- }
-
- public void hit(boolean caching) {
- hitCount.incrementAndGet();
- if (caching) hitCachingCount.incrementAndGet();
- }
-
- public void evict() {
- evictionCount.incrementAndGet();
- }
-
- public void evicted() {
- evictedCount.incrementAndGet();
- }
-
- public long getRequestCount() {
- return getHitCount() + getMissCount();
- }
-
- public long getRequestCachingCount() {
- return getHitCachingCount() + getMissCachingCount();
- }
-
- public long getMissCount() {
- return missCount.get();
- }
-
- public long getMissCachingCount() {
- return missCachingCount.get();
- }
-
- public long getHitCount() {
- return hitCount.get();
- }
-
- public long getHitCachingCount() {
- return hitCachingCount.get();
- }
-
- public long getEvictionCount() {
- return evictionCount.get();
- }
-
- public long getEvictedCount() {
- return evictedCount.get();
- }
-
- public double getHitRatio() {
- return ((float)getHitCount()/(float)getRequestCount());
- }
-
- public double getHitCachingRatio() {
- return ((float)getHitCachingCount()/(float)getRequestCachingCount());
- }
-
- public double getMissRatio() {
- return ((float)getMissCount()/(float)getRequestCount());
- }
-
- public double getMissCachingRatio() {
- return ((float)getMissCachingCount()/(float)getRequestCachingCount());
- }
-
- public double evictedPerEviction() {
- return ((float)getEvictedCount()/(float)getEvictionCount());
- }
- }
-
+
public final static long CACHE_FIXED_OVERHEAD = ClassSize.align(
(3 * Bytes.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) +
(5 * Bytes.SIZEOF_FLOAT) + Bytes.SIZEOF_BOOLEAN
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java?rev=1162207&r1=1162206&r2=1162207&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/SimpleBlockCache.java Fri Aug 26 18:53:00 2011
@@ -26,17 +26,15 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
/**
* Simple one RFile soft reference cache.
*/
public class SimpleBlockCache implements BlockCache {
- private static class Ref extends SoftReference<HeapSize> {
+ private static class Ref extends SoftReference<Cacheable> {
public String blockId;
- public Ref(String blockId, HeapSize block, ReferenceQueue q) {
+ public Ref(String blockId, Cacheable block, ReferenceQueue q) {
super(block, q);
this.blockId = blockId;
}
@@ -70,7 +68,7 @@ public class SimpleBlockCache implements
return cache.size();
}
- public synchronized HeapSize getBlock(String blockName, boolean caching) {
+ public synchronized Cacheable getBlock(String blockName, boolean caching) {
processQueue(); // clear out some crap.
Ref ref = cache.get(blockName);
if (ref == null)
@@ -78,11 +76,11 @@ public class SimpleBlockCache implements
return ref.get();
}
- public synchronized void cacheBlock(String blockName, HeapSize block) {
+ public synchronized void cacheBlock(String blockName, Cacheable block) {
cache.put(blockName, new Ref(blockName, block, q));
}
- public synchronized void cacheBlock(String blockName, HeapSize block,
+ public synchronized void cacheBlock(String blockName, Cacheable block,
boolean inMemory) {
cache.put(blockName, new Ref(blockName, block, q));
}
@@ -124,7 +122,7 @@ public class SimpleBlockCache implements
public int evictBlocksByPrefix(String string) {
throw new UnsupportedOperationException();
}
-
+
@Override
public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(Configuration conf) {
throw new UnsupportedOperationException();
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java?rev=1162207&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SingleSizeCache.java Fri Aug 26 18:53:00 2011
@@ -0,0 +1,306 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.slab;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
+import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.collect.MapEvictionListener;
+import com.google.common.collect.MapMaker;
+
+/**
+ * SingleSizeCache is a slab allocated cache that caches elements up to a single
+ * size. It uses a slab allocator (Slab.java) to divide a direct bytebuffer,
+ * into evenly sized blocks. Any cached data will take up exactly 1 block. An
+ * exception will be thrown if the cached data cannot fit into the blockSize of
+ * this SingleSizeCache.
+ *
+ * Eviction and LRUness is taken care of by Guava's MapMaker, which creates a
+ * ConcurrentLinkedHashMap.
+ *
+ **/
+public class SingleSizeCache implements BlockCache {
+ private final Slab backingStore;
+ private final ConcurrentMap<String, CacheablePair> backingMap;
+ private final int numBlocks;
+ private final int blockSize;
+ private final CacheStats stats;
+ private final SlabItemEvictionWatcher evictionWatcher;
+ private AtomicLong size;
+ public final static long CACHE_FIXED_OVERHEAD = ClassSize
+ .align((2 * Bytes.SIZEOF_INT) + (5 * ClassSize.REFERENCE)
+ + +ClassSize.OBJECT);
+
+ static final Log LOG = LogFactory.getLog(SingleSizeCache.class);
+
+ /**
+ * Default constructor. Specify the size of the blocks, number of blocks, and
+ * the SlabCache this cache will be assigned to.
+ *
+ *
+ * @param blockSize the size of each block, in bytes
+ *
+ * @param numBlocks the number of blocks of blockSize this cache will hold.
+ *
+ * @param master the SlabCache this SingleSlabCache is assigned to.
+ */
+ public SingleSizeCache(int blockSize, int numBlocks,
+ SlabItemEvictionWatcher master) {
+ this.blockSize = blockSize;
+ this.numBlocks = numBlocks;
+ backingStore = new Slab(blockSize, numBlocks);
+ this.stats = new CacheStats();
+ this.evictionWatcher = master;
+ this.size = new AtomicLong(CACHE_FIXED_OVERHEAD + backingStore.heapSize());
+
+ // This evictionListener is called whenever the cache automatically evicts
+ // something.
+ MapEvictionListener<String, CacheablePair> listener = new MapEvictionListener<String, CacheablePair>() {
+ @Override
+ public void onEviction(String key, CacheablePair value) {
+ try {
+ value.evictionLock.writeLock().lock();
+ backingStore.free(value.serializedData);
+ stats.evict();
+ /**
+ * We may choose to run this cache alone, without the SlabCache on
+ * top, no evictionWatcher in that case
+ */
+ if (evictionWatcher != null) {
+ evictionWatcher.onEviction(key, false);
+ }
+ size.addAndGet(-1 * value.heapSize());
+ stats.evicted();
+ } finally {
+ value.evictionLock.writeLock().unlock();
+ }
+ }
+ };
+
+ backingMap = new MapMaker().maximumSize(numBlocks - 1)
+ .evictionListener(listener).makeMap();
+
+ }
+
+ @Override
+ public synchronized void cacheBlock(String blockName, Cacheable toBeCached) {
+ ByteBuffer storedBlock = backingStore.alloc(toBeCached
+ .getSerializedLength());
+
+ CacheablePair newEntry = new CacheablePair(toBeCached.getDeserializer(),
+ storedBlock);
+
+ CacheablePair alreadyCached = backingMap.putIfAbsent(blockName, newEntry);
+
+ if (alreadyCached != null) {
+ backingStore.free(storedBlock);
+ throw new RuntimeException("already cached " + blockName);
+ }
+ toBeCached.serialize(storedBlock);
+ this.size.addAndGet(newEntry.heapSize());
+ }
+
+ @Override
+ public Cacheable getBlock(String key, boolean caching) {
+ CacheablePair contentBlock = backingMap.get(key);
+ if (contentBlock == null) {
+ stats.miss(caching);
+ return null;
+ }
+
+ stats.hit(caching);
+ // If lock cannot be obtained, that means we're undergoing eviction.
+ if (contentBlock.evictionLock.readLock().tryLock()) {
+ try {
+ return contentBlock.deserializer
+ .deserialize(contentBlock.serializedData);
+ } catch (IOException e) {
+ e.printStackTrace();
+ LOG.warn("Deserializer throwing ioexception, possibly deserializing wrong object buffer");
+ return null;
+ } finally {
+ contentBlock.evictionLock.readLock().unlock();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Evicts the block
+ *
+ * @param key the key of the entry we are going to evict
+ * @return the evicted ByteBuffer
+ */
+ public boolean evictBlock(String key) {
+ stats.evict();
+ CacheablePair evictedBlock = backingMap.remove(key);
+ if (evictedBlock != null) {
+ try {
+ evictedBlock.evictionLock.writeLock().lock();
+ backingStore.free(evictedBlock.serializedData);
+ evictionWatcher.onEviction(key, false);
+ stats.evicted();
+ size.addAndGet(-1 * evictedBlock.heapSize());
+ } finally {
+ evictedBlock.evictionLock.writeLock().unlock();
+ }
+ }
+ return evictedBlock != null;
+
+ }
+
+ public void logStats() {
+
+ LOG.info("For Slab of size " + this.blockSize + ": "
+ + this.getOccupiedSize() / this.blockSize
+ + " occupied, out of a capacity of " + this.numBlocks
+ + " blocks. HeapSize is "
+ + StringUtils.humanReadableInt(this.heapSize()) + " bytes.");
+
+ LOG.debug("Slab Stats: " + "accesses="
+ + stats.getRequestCount()
+ + ", "
+ + "hits="
+ + stats.getHitCount()
+ + ", "
+ + "hitRatio="
+ + (stats.getHitCount() == 0 ? "0" : (StringUtils.formatPercent(
+ stats.getHitRatio(), 2) + "%, "))
+ + "cachingAccesses="
+ + stats.getRequestCachingCount()
+ + ", "
+ + "cachingHits="
+ + stats.getHitCachingCount()
+ + ", "
+ + "cachingHitsRatio="
+ + (stats.getHitCachingCount() == 0 ? "0" : (StringUtils.formatPercent(
+ stats.getHitCachingRatio(), 2) + "%, ")) + "evictions="
+ + stats.getEvictionCount() + ", " + "evicted="
+ + stats.getEvictedCount() + ", " + "evictedPerRun="
+ + stats.evictedPerEviction());
+
+ }
+
+ public void shutdown() {
+ backingStore.shutdown();
+ }
+
+ public long heapSize() {
+ return this.size() + backingStore.heapSize();
+ }
+
+ public long size() {
+ return this.blockSize * this.numBlocks;
+ }
+
+ public long getFreeSize() {
+ return backingStore.getBlocksRemaining() * blockSize;
+ }
+
+ public long getOccupiedSize() {
+ return (numBlocks - backingStore.getBlocksRemaining()) * blockSize;
+ }
+
+ public long getEvictedCount() {
+ return stats.getEvictedCount();
+ }
+
+ public CacheStats getStats() {
+ return this.stats;
+ }
+
+ /* Since its offheap, it doesn't matter if its in memory or not */
+ @Override
+ public void cacheBlock(String blockName, Cacheable buf, boolean inMemory) {
+ this.cacheBlock(blockName, buf);
+ }
+
+ /*
+ * This is never called, as evictions are handled in the SlabCache layer,
+ * implemented in the event we want to use this as a standalone cache.
+ */
+ @Override
+ public int evictBlocksByPrefix(String prefix) {
+ int evictedCount = 0;
+ for (String e : backingMap.keySet()) {
+ if (e.startsWith(prefix)) {
+ this.evictBlock(e);
+ }
+ }
+ return evictedCount;
+ }
+
+ @Override
+ public long getCurrentSize() {
+ return 0;
+ }
+
+ /*
+ * Not implemented. Extremely costly to do this from the off heap cache, you'd
+ * need to copy every object on heap once
+ */
+ @Override
+ public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(
+ Configuration conf) {
+ throw new UnsupportedOperationException();
+ }
+
+ /* Just a pair class, holds a reference to the parent cacheable */
+ private class CacheablePair implements HeapSize {
+ final CacheableDeserializer<Cacheable> deserializer;
+ final ByteBuffer serializedData;
+ final ReentrantReadWriteLock evictionLock;
+
+ private CacheablePair(CacheableDeserializer<Cacheable> deserializer,
+ ByteBuffer serializedData) {
+ this.deserializer = deserializer;
+ this.serializedData = serializedData;
+ evictionLock = new ReentrantReadWriteLock();
+ }
+
+ /*
+ * Heapsize overhead of this is the default object overhead, the heapsize of
+ * the serialized object, and the cost of a reference to the bytebuffer,
+ * which is already accounted for in SingleSizeCache
+ */
+ @Override
+ public long heapSize() {
+ return ClassSize.align(ClassSize.OBJECT + ClassSize.REFERENCE * 3
+ + ClassSize.REENTRANT_LOCK);
+ }
+ }
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/Slab.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/Slab.java?rev=1162207&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/Slab.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/Slab.java Fri Aug 26 18:53:00 2011
@@ -0,0 +1,131 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.slab;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.hbase.util.DirectMemoryUtils;
+import com.google.common.base.Preconditions;
+
+/**
+ * Slab is a class which is designed to allocate blocks of a certain size.
+ * Constructor creates a number of DirectByteBuffers and slices them into the
+ * requisite size, then puts them all in a buffer.
+ **/
+
+class Slab implements org.apache.hadoop.hbase.io.HeapSize {
+ static final Log LOG = LogFactory.getLog(Slab.class);
+
+ /** This is where our items, or blocks of the slab, are stored. */
+ private ConcurrentLinkedQueue<ByteBuffer> buffers;
+
+ /** This is where our Slabs are stored */
+ private ConcurrentLinkedQueue<ByteBuffer> slabs;
+
+ private final int blockSize;
+ private final int numBlocks;
+ private long heapSize;
+
+ Slab(int blockSize, int numBlocks) {
+ buffers = new ConcurrentLinkedQueue<ByteBuffer>();
+ slabs = new ConcurrentLinkedQueue<ByteBuffer>();
+
+ this.blockSize = blockSize;
+ this.numBlocks = numBlocks;
+
+ this.heapSize = ClassSize.estimateBase(this.getClass(), false);
+
+ int maxBlocksPerSlab = Integer.MAX_VALUE / blockSize;
+ int maxSlabSize = maxBlocksPerSlab * blockSize;
+
+ int numFullSlabs = numBlocks / maxBlocksPerSlab;
+ int partialSlabSize = (numBlocks % maxBlocksPerSlab) * blockSize;
+ for (int i = 0; i < numFullSlabs; i++) {
+ allocateAndSlice(maxSlabSize, blockSize);
+ }
+
+ if (partialSlabSize > 0) {
+ allocateAndSlice(partialSlabSize, blockSize);
+ }
+ }
+
+ private void allocateAndSlice(int size, int sliceSize) {
+ ByteBuffer newSlab = ByteBuffer.allocateDirect(size);
+ slabs.add(newSlab);
+ for (int j = 0; j < newSlab.capacity(); j += sliceSize) {
+ newSlab.limit(j + sliceSize).position(j);
+ ByteBuffer aSlice = newSlab.slice();
+ buffers.add(aSlice);
+ heapSize += ClassSize.estimateBase(aSlice.getClass(), false);
+ }
+ }
+
+ /*
+ * Shutdown deallocates the memory for all the DirectByteBuffers. Each
+ * DirectByteBuffer has a "cleaner" method, which is similar to a
+ * deconstructor in C++.
+ */
+ void shutdown() {
+ for (ByteBuffer aSlab : slabs) {
+ try {
+ DirectMemoryUtils.destroyDirectByteBuffer(aSlab);
+ } catch (Exception e) {
+ LOG.warn("Unable to deallocate direct memory during shutdown", e);
+ }
+ }
+ }
+
+ int getBlockSize() {
+ return this.blockSize;
+ }
+
+ int getBlockCapacity() {
+ return this.numBlocks;
+ }
+
+ int getBlocksRemaining() {
+ return this.buffers.size();
+ }
+
+ /*
+ * This spinlocks if empty. Make sure your program can deal with that, and
+ * will complete eviction on time.
+ */
+ ByteBuffer alloc(int bufferSize) {
+ int newCapacity = Preconditions.checkPositionIndex(bufferSize, blockSize);
+ while (buffers.isEmpty()); // Spinlock
+ ByteBuffer returnedBuffer = buffers.remove();
+ returnedBuffer.clear().limit(newCapacity);
+ return returnedBuffer;
+ }
+
+ void free(ByteBuffer toBeFreed) {
+ Preconditions.checkArgument(toBeFreed.capacity() == blockSize);
+ buffers.add(toBeFreed);
+ }
+
+ @Override
+ public long heapSize() {
+ return heapSize;
+ }
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java?rev=1162207&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabCache.java Fri Aug 26 18:53:00 2011
@@ -0,0 +1,392 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile.slab;
+
+import java.math.BigDecimal;
+import java.util.Map.Entry;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
+import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * SlabCache is composed of multiple SingleSizeCaches. It uses a TreeMap in
+ * order to determine where a given element fits. Redirects gets and puts to the
+ * correct SingleSizeCache.
+ *
+ **/
+public class SlabCache implements SlabItemEvictionWatcher, BlockCache, HeapSize {
+
+ private final ConcurrentHashMap<String, SingleSizeCache> backingStore;
+ private final TreeMap<Integer, SingleSizeCache> sizer;
+ static final Log LOG = LogFactory.getLog(SlabCache.class);
+ static final int STAT_THREAD_PERIOD_SECS = 60 * 5;
+
+ private final ScheduledExecutorService scheduleThreadPool = Executors
+ .newScheduledThreadPool(1,
+ new ThreadFactoryBuilder().setNameFormat("Slab Statistics #%d")
+ .build());
+
+ long size;
+ private final CacheStats stats;
+ final SlabStats slabstats;
+ private final long avgBlockSize;
+ private static final long CACHE_FIXED_OVERHEAD = ClassSize.estimateBase(
+ SlabCache.class, false);
+
+ /**
+ * Default constructor, creates an empty SlabCache.
+ *
+ * @param size Total size allocated to the SlabCache. (Bytes)
+ * @param avgBlockSize Average size of a block being cached.
+ **/
+
+ public SlabCache(long size, long avgBlockSize) {
+ this.avgBlockSize = avgBlockSize;
+ this.size = size;
+ this.stats = new CacheStats();
+ this.slabstats = new SlabStats();
+ backingStore = new ConcurrentHashMap<String, SingleSizeCache>();
+ sizer = new TreeMap<Integer, SingleSizeCache>();
+ this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this),
+ STAT_THREAD_PERIOD_SECS, STAT_THREAD_PERIOD_SECS, TimeUnit.SECONDS);
+
+ }
+
+ /**
+ * A way of allocating the desired amount of Slabs of each particular size.
+ *
+ * This reads two lists from conf, hbase.offheap.slab.proportions and
+ * hbase.offheap.slab.sizes.
+ *
+ * The first list is the percentage of our total space we allocate to the
+ * slabs.
+ *
+ * The second list is blocksize of the slabs in bytes. (E.g. the slab holds
+ * blocks of this size).
+ *
+ * @param Configuration file.
+ */
+ public void addSlabByConf(Configuration conf) {
+ // Proportions we allocate to each slab of the total size.
+ String[] porportions = conf.getStrings(
+ "hbase.offheapcache.slab.proportions", "0.80", "0.20");
+ String[] sizes = conf.getStrings("hbase.offheapcache.slab.sizes", new Long(
+ avgBlockSize * 11 / 10).toString(), new Long(avgBlockSize * 21 / 10)
+ .toString());
+
+ if (porportions.length != sizes.length) {
+ throw new IllegalArgumentException(
+ "SlabCache conf not "
+ + "initialized, error in configuration. hbase.offheap.slab.proportions specifies "
+ + porportions.length
+ + " slabs while hbase.offheap.slab.sizes specifies "
+ + sizes.length + " slabs "
+ + "offheapslabporportions and offheapslabsizes");
+ }
+ /* We use BigDecimals instead of floats because float rounding is annoying */
+
+ BigDecimal[] parsedProportions = stringArrayToBigDecimalArray(porportions);
+ BigDecimal[] parsedSizes = stringArrayToBigDecimalArray(sizes);
+
+ BigDecimal sumProportions = new BigDecimal(0);
+ for (BigDecimal b : parsedProportions) {
+ /* Make sure all proportions are greater than 0 */
+ Preconditions
+ .checkArgument(b.compareTo(BigDecimal.ZERO) == 1,
+ "Proportions in hbase.offheap.slab.proportions must be greater than 0!");
+ sumProportions = sumProportions.add(b);
+ }
+
+ /* If the sum is greater than 1 */
+ Preconditions
+ .checkArgument(sumProportions.compareTo(BigDecimal.ONE) != 1,
+ "Sum of all proportions in hbase.offheap.slab.proportions must be less than 1");
+
+ /* If the sum of all proportions is less than 0.99 */
+ if (sumProportions.compareTo(new BigDecimal("0.99")) == -1) {
+ LOG.warn("Sum of hbase.offheap.slab.proportions is less than 0.99! Memory is being wasted");
+ }
+ for (int i = 0; i < parsedProportions.length; i++) {
+ int blockSize = parsedSizes[i].intValue();
+ int numBlocks = new BigDecimal(this.size).multiply(parsedProportions[i])
+ .divide(parsedSizes[i], BigDecimal.ROUND_DOWN).intValue();
+ addSlab(blockSize, numBlocks);
+ }
+ }
+
+ /**
+ * Gets the size of the slab cache a ByteBuffer of this size would be
+ * allocated to.
+ *
+ * @param size Size of the ByteBuffer we are checking.
+ *
+ * @return the Slab that the above bytebuffer would be allocated towards. If
+ * object is too large, returns null.
+ */
+ Entry<Integer, SingleSizeCache> getHigherBlock(int size) {
+ return sizer.higherEntry(size - 1);
+ }
+
+ private BigDecimal[] stringArrayToBigDecimalArray(String[] parsee) {
+ BigDecimal[] parsed = new BigDecimal[parsee.length];
+ for (int i = 0; i < parsee.length; i++) {
+ parsed[i] = new BigDecimal(parsee[i].trim());
+ }
+ return parsed;
+ }
+
+ private void addSlab(int blockSize, int numBlocks) {
+ sizer.put(blockSize, new SingleSizeCache(blockSize, numBlocks, this));
+ }
+
+ /**
+ * Cache the block with the specified name and buffer. First finds what size
+ * SingleSlabCache it should fit in. If the block doesn't fit in any, it will
+ * return without doing anything.
+ * <p>
+ * It is assumed this will NEVER be called on an already cached block. If that
+ * is done, it is assumed that you are reinserting the same exact block due to
+ * a race condition, and will throw a runtime exception.
+ *
+ * @param blockName block name
+ * @param cachedItem block buffer
+ */
+ public void cacheBlock(String blockName, Cacheable cachedItem) {
+ Entry<Integer, SingleSizeCache> scacheEntry = getHigherBlock(cachedItem
+ .getSerializedLength());
+
+ this.slabstats.addin(cachedItem.getSerializedLength());
+
+ if (scacheEntry == null) {
+ return; // we can't cache, something too big.
+ }
+
+ SingleSizeCache scache = scacheEntry.getValue();
+ scache.cacheBlock(blockName, cachedItem); // if this
+ // fails, due to
+ // block already
+ // being there, exception will be thrown
+ backingStore.put(blockName, scache);
+ }
+
+ /**
+ * We don't care about whether its in memory or not, so we just pass the call
+ * through.
+ */
+ public void cacheBlock(String blockName, Cacheable buf, boolean inMemory) {
+ cacheBlock(blockName, buf);
+ }
+
+ public CacheStats getStats() {
+ return this.stats;
+ }
+
+ /**
+ * Get the buffer of the block with the specified name.
+ *
+ * @param blockName block name
+ * @return buffer of specified block name, or null if not in cache
+ */
+ public Cacheable getBlock(String key, boolean caching) {
+ SingleSizeCache cachedBlock = backingStore.get(key);
+ if (cachedBlock == null) {
+ return null;
+ }
+
+ Cacheable contentBlock = cachedBlock.getBlock(key, caching);
+
+ if (contentBlock != null) {
+ stats.hit(caching);
+ } else {
+ stats.miss(caching);
+ }
+ return contentBlock;
+ }
+
+ /**
+ * Evicts a block from the cache. This is public, and thus contributes to the
+ * the evict counter.
+ */
+ public boolean evictBlock(String key) {
+ stats.evict();
+ return onEviction(key, true);
+ }
+
+ @Override
+ public boolean onEviction(String key, boolean callAssignedCache) {
+ SingleSizeCache cacheEntry = backingStore.remove(key);
+ if (cacheEntry == null) {
+ return false;
+ }
+ /* we need to bump up stats.evict, as this call came from the assignedCache. */
+ if (callAssignedCache == false) {
+ stats.evict();
+ }
+ stats.evicted();
+ if (callAssignedCache) {
+ cacheEntry.evictBlock(key);
+ }
+ return true;
+ }
+
+ /**
+ * Sends a shutdown to all SingleSizeCache's contained by this cache.F
+ */
+ public void shutdown() {
+ for (SingleSizeCache s : sizer.values()) {
+ s.shutdown();
+ }
+ }
+
+ public long heapSize() {
+ long childCacheSize = 0;
+ for (SingleSizeCache s : sizer.values()) {
+ childCacheSize += s.heapSize();
+ }
+ return SlabCache.CACHE_FIXED_OVERHEAD + childCacheSize;
+ }
+
+ public long size() {
+ return this.size;
+ }
+
+ public long getFreeSize() {
+ return 0; // this cache, by default, allocates all its space.
+ }
+
+ public long getCurrentSize() {
+ return size;
+ }
+
+ public long getEvictedCount() {
+ return stats.getEvictedCount();
+ }
+
+ /*
+ * Statistics thread. Periodically prints the cache statistics to the log.
+ */
+ static class StatisticsThread extends Thread {
+ SlabCache ourcache;
+
+ public StatisticsThread(SlabCache slabCache) {
+ super("SlabCache.StatisticsThread");
+ setDaemon(true);
+ this.ourcache = slabCache;
+ }
+
+ @Override
+ public void run() {
+ ourcache.slabstats.logStats(ourcache);
+ }
+
+ }
+
+ /**
+ * Just like CacheStats, but more Slab specific. Finely grained profiling of
+ * sizes we store using logs.
+ *
+ */
+ static class SlabStats {
+ // the maximum size somebody will ever try to cache, then we multiply by 10
+ // so we have finer grained stats.
+ private final int MULTIPLIER = 10;
+ private final int NUMDIVISIONS = (int) (Math.log(Integer.MAX_VALUE) * MULTIPLIER);
+ private final AtomicLong[] counts = new AtomicLong[NUMDIVISIONS];
+
+ public SlabStats() {
+ for (int i = 0; i < NUMDIVISIONS; i++) {
+ counts[i] = new AtomicLong();
+ }
+ }
+
+ public void addin(int size) {
+ int index = (int) (Math.log(size) * MULTIPLIER);
+ counts[index].incrementAndGet();
+ }
+
+ public AtomicLong[] getUsage() {
+ return counts;
+ }
+
+ public void logStats(SlabCache slabCache) {
+ for (SingleSizeCache s : slabCache.sizer.values()) {
+ s.logStats();
+ }
+ AtomicLong[] fineGrainedStats = getUsage();
+ int multiplier = MULTIPLIER;
+ SlabCache.LOG.info("Current heap size is: "
+ + StringUtils.humanReadableInt(slabCache.heapSize()));
+ for (int i = 0; i < fineGrainedStats.length; i++) {
+ double lowerbound = Math.pow(Math.E, (double) i / (double) multiplier
+ - 0.5);
+ double upperbound = Math.pow(Math.E, (double) i / (double) multiplier
+ + 0.5);
+
+ SlabCache.LOG.info("From "
+ + StringUtils.humanReadableInt((long) lowerbound) + "- "
+ + StringUtils.humanReadableInt((long) upperbound) + ": "
+ + StringUtils.humanReadableInt(fineGrainedStats[i].get())
+ + " requests");
+
+ }
+ }
+ }
+
+ public int evictBlocksByPrefix(String prefix) {
+ int numEvicted = 0;
+ for (String key : backingStore.keySet()) {
+ if (key.startsWith(prefix)) {
+ if (evictBlock(key))
+ ++numEvicted;
+ }
+ }
+ return numEvicted;
+ }
+
+ /*
+ * Not implemented. Extremely costly to do this from the off heap cache, you'd
+ * need to copy every object on heap once
+ */
+ @Override
+ public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries(
+ Configuration conf) {
+ throw new UnsupportedOperationException();
+ }
+
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabItemEvictionWatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabItemEvictionWatcher.java?rev=1162207&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabItemEvictionWatcher.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/slab/SlabItemEvictionWatcher.java Fri Aug 26 18:53:00 2011
@@ -0,0 +1,38 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile.slab;
+
+/**
+ * Interface for objects that want to know when an eviction occurs.
+ * */
+interface SlabItemEvictionWatcher {
+
+ /**
+ * This is called as a callback by the EvictionListener in each of the
+ * SingleSizeSlabCaches.
+ *
+ * @param key the key of the item being evicted
+ * @param boolean callAssignedCache whether we should call the cache which the
+ * key was originally assigned to.
+ */
+ boolean onEviction(String key, boolean callAssignedCache);
+
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1162207&r1=1162206&r2=1162207&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Aug 26 18:53:00 2011
@@ -99,7 +99,8 @@ import org.apache.hadoop.hbase.filter.Co
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
-import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
+import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@@ -630,9 +631,9 @@ public class HRegionServer implements HR
closeUserRegions(this.abortRequested);
} else if (this.stopping) {
LOG.info("Stopping meta regions, if the HRegionServer hosts any");
-
+
boolean allUserRegionsOffline = areAllUserRegionsOffline();
-
+
if (allUserRegionsOffline) {
// Set stopped if no requests since last time we went around the loop.
// The remaining meta regions will be closed on our way out.
@@ -1072,13 +1073,13 @@ public class HRegionServer implements HR
super("CompactionChecker", sleepTime, h);
this.instance = h;
LOG.info("Runs every " + StringUtils.formatTime(sleepTime));
-
+
/* MajorCompactPriority is configurable.
* If not set, the compaction will use default priority.
*/
this.majorCompactPriority = this.instance.conf.
getInt("hbase.regionserver.compactionChecker.majorCompactPriority",
- DEFAULT_PRIORITY);
+ DEFAULT_PRIORITY);
}
@Override
@@ -1093,14 +1094,14 @@ public class HRegionServer implements HR
this.instance.compactSplitThread.requestCompaction(r, s,
getName() + " requests compaction");
} else if (s.isMajorCompaction()) {
- if (majorCompactPriority == DEFAULT_PRIORITY ||
+ if (majorCompactPriority == DEFAULT_PRIORITY ||
majorCompactPriority > r.getCompactPriority()) {
this.instance.compactSplitThread.requestCompaction(r, s,
getName() + " requests major compaction; use default priority");
} else {
this.instance.compactSplitThread.requestCompaction(r, s,
getName() + " requests major compaction; use configured priority",
- this.majorCompactPriority);
+ this.majorCompactPriority);
}
}
} catch (IOException e) {
@@ -1225,7 +1226,7 @@ public class HRegionServer implements HR
totalStaticBloomSize += store.getTotalStaticBloomSize();
}
}
-
+
hdfsBlocksDistribution.add(r.getHDFSBlocksDistribution());
}
this.metrics.stores.set(stores);
@@ -1262,7 +1263,7 @@ public class HRegionServer implements HR
getServerName().getHostname());
int percent = (int) (localityIndex * 100);
this.metrics.hdfsBlocksLocalityIndex.set(percent);
-
+
}
/**
@@ -1351,7 +1352,7 @@ public class HRegionServer implements HR
while (true) {
try {
this.infoServer = new InfoServer("regionserver", addr, port, false, this.conf);
- this.infoServer.addServlet("status", "/rs-status", RSStatusServlet.class);
+ this.infoServer.addServlet("status", "/rs-status", RSStatusServlet.class);
this.infoServer.setAttribute(REGIONSERVER, this);
this.infoServer.start();
break;
@@ -1834,7 +1835,7 @@ public class HRegionServer implements HR
+ "regionName is null");
}
HRegion region = getRegion(regionName);
- Integer lock = getLockFromId(put.getLockId());
+ Integer lock = getLockFromId(put.getLockId());
if (region.getCoprocessorHost() != null) {
Boolean result = region.getCoprocessorHost()
.preCheckAndPut(row, family, qualifier, compareOp, comparator, put);
@@ -1873,7 +1874,7 @@ public class HRegionServer implements HR
+ "regionName is null");
}
HRegion region = getRegion(regionName);
- Integer lock = getLockFromId(delete.getLockId());
+ Integer lock = getLockFromId(delete.getLockId());
WritableByteArrayComparable comparator = new BinaryComparator(value);
if (region.getCoprocessorHost() != null) {
Boolean result = region.getCoprocessorHost().preCheckAndDelete(row,
@@ -1914,7 +1915,7 @@ public class HRegionServer implements HR
+ "regionName is null");
}
HRegion region = getRegion(regionName);
- Integer lock = getLockFromId(delete.getLockId());
+ Integer lock = getLockFromId(delete.getLockId());
if (region.getCoprocessorHost() != null) {
Boolean result = region.getCoprocessorHost().preCheckAndDelete(row,
family, qualifier, compareOp, comparator, delete);
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1162207&r1=1162206&r2=1162207&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Fri Aug 26 18:53:00 2011
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.io.HalfSt
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.DoubleBlockCache;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.HFileWriterV1;
@@ -58,6 +59,7 @@ import org.apache.hadoop.hbase.util.Bloo
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.DirectMemoryUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.RawComparator;
@@ -375,9 +377,15 @@ public class StoreFile {
// Calculate the amount of heap to give the heap.
MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
long cacheSize = (long)(mu.getMax() * cachePercentage);
+ int blockSize = conf.getInt("hbase.offheapcache.minblocksize", HFile.DEFAULT_BLOCKSIZE);
+ long offHeapCacheSize = (long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0.95) * DirectMemoryUtils.getDirectMemorySize());
LOG.info("Allocating LruBlockCache with maximum size " +
StringUtils.humanReadableInt(cacheSize));
- hfileBlockCache = new LruBlockCache(cacheSize, DEFAULT_BLOCKSIZE_SMALL);
+ if(offHeapCacheSize <= 0) {
+ hfileBlockCache = new LruBlockCache(cacheSize, DEFAULT_BLOCKSIZE_SMALL);
+ } else {
+ hfileBlockCache = new DoubleBlockCache(cacheSize, offHeapCacheSize, DEFAULT_BLOCKSIZE_SMALL, blockSize);
+ }
return hfileBlockCache;
}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java?rev=1162207&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/DirectMemoryUtils.java Fri Aug 26 18:53:00 2011
@@ -0,0 +1,95 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+
+public class DirectMemoryUtils {
+ /**
+ * @return the setting of -XX:MaxDirectMemorySize as a long. Returns 0 if
+ * -XX:MaxDirectMemorySize is not set.
+ */
+
+ public static long getDirectMemorySize() {
+ RuntimeMXBean RuntimemxBean = ManagementFactory.getRuntimeMXBean();
+ List<String> arguments = RuntimemxBean.getInputArguments();
+ long multiplier = 1; //for the byte case.
+ for (String s : arguments) {
+ if (s.contains("-XX:MaxDirectMemorySize=")) {
+ String memSize = s.toLowerCase()
+ .replace("-xx:maxdirectmemorysize=", "").trim();
+
+ if (memSize.contains("k")) {
+ multiplier = 1024;
+ }
+
+ else if (memSize.contains("m")) {
+ multiplier = 1048576;
+ }
+
+ else if (memSize.contains("g")) {
+ multiplier = 1073741824;
+ }
+ memSize = memSize.replaceAll("[^\\d]", "");
+
+ long retValue = Long.parseLong(memSize);
+ return retValue * multiplier;
+ }
+
+ }
+ return 0;
+ }
+
+ /**
+ * DirectByteBuffers are garbage collected by using a phantom reference and a
+ * reference queue. Every once a while, the JVM checks the reference queue and
+ * cleans the DirectByteBuffers. However, as this doesn't happen
+ * immediately after discarding all references to a DirectByteBuffer, it's
+ * easy to OutOfMemoryError yourself using DirectByteBuffers. This function
+ * explicitly calls the Cleaner method of a DirectByteBuffer.
+ *
+ * @param toBeDestroyed
+ * The DirectByteBuffer that will be "cleaned". Utilizes reflection.
+ *
+ */
+ public static void destroyDirectByteBuffer(ByteBuffer toBeDestroyed)
+ throws IllegalArgumentException, IllegalAccessException,
+ InvocationTargetException, SecurityException, NoSuchMethodException {
+
+ Preconditions.checkArgument(toBeDestroyed.isDirect(),
+ "toBeDestroyed isn't direct!");
+
+ Method cleanerMethod = toBeDestroyed.getClass().getMethod("cleaner");
+ cleanerMethod.setAccessible(true);
+ Object cleaner = cleanerMethod.invoke(toBeDestroyed);
+ Method cleanMethod = cleaner.getClass().getMethod("clean");
+ cleanMethod.setAccessible(true);
+ cleanMethod.invoke(cleaner);
+
+ }
+}
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java?rev=1162207&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java Fri Aug 26 18:53:00 2011
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.MultithreadedTestUtil;
+import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
+
+public class CacheTestUtils {
+
+ public static void testCacheMultiThreaded(final BlockCache toBeTested,
+ final int blockSize, final int numThreads, final int numQueries,
+ final double passingScore) throws Exception {
+
+ Configuration conf = new Configuration();
+ MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
+ conf);
+
+ final AtomicInteger totalQueries = new AtomicInteger();
+ final ConcurrentLinkedQueue<HFileBlockPair> blocksToTest = new ConcurrentLinkedQueue<HFileBlockPair>();
+ final AtomicInteger hits = new AtomicInteger();
+ final AtomicInteger miss = new AtomicInteger();
+
+ HFileBlockPair[] blocks = generateHFileBlocks(numQueries, blockSize);
+ blocksToTest.addAll(Arrays.asList(blocks));
+
+ for (int i = 0; i < numThreads; i++) {
+ TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
+ @Override
+ public void doAnAction() throws Exception {
+ if (!blocksToTest.isEmpty()) {
+ HFileBlockPair ourBlock = blocksToTest.remove();
+ toBeTested.cacheBlock(ourBlock.blockName, ourBlock.block);
+ Cacheable retrievedBlock = toBeTested.getBlock(ourBlock.blockName,
+ false);
+ if (retrievedBlock != null) {
+ assertEquals(ourBlock.block, retrievedBlock);
+ hits.incrementAndGet();
+ } else {
+ miss.incrementAndGet();
+ }
+ totalQueries.incrementAndGet();
+ }
+ }
+ };
+ ctx.addThread(t);
+ }
+ ctx.startThreads();
+ while (!blocksToTest.isEmpty() && ctx.shouldRun()) {
+ Thread.sleep(10);
+ }
+ ctx.stop();
+ if((double) hits.get() / ((double) hits.get() + (double) miss.get()) < passingScore){
+ fail("Too many nulls returned. Hits: " + hits.get() + " Misses: " + miss.get());
+ }
+ }
+
+ public static void testCacheSimple(BlockCache toBeTested, int blockSize,
+ int numBlocks) throws Exception {
+
+ HFileBlockPair[] blocks = generateHFileBlocks(numBlocks, blockSize);
+ // Confirm empty
+ for (HFileBlockPair block : blocks) {
+ assertNull(toBeTested.getBlock(block.blockName, true));
+ }
+
+ // Add blocks
+ for (HFileBlockPair block : blocks) {
+ toBeTested.cacheBlock(block.blockName, block.block);
+ }
+
+ // Check if all blocks are properly cached and contain the right
+ // information, or the blocks are null.
+ // MapMaker makes no guarantees when it will evict, so neither can we.
+
+ for (HFileBlockPair block : blocks) {
+ HFileBlock buf = (HFileBlock) toBeTested.getBlock(block.blockName, true);
+ if (buf != null) {
+ assertEquals(block.block, buf);
+ }
+
+ }
+
+ // Re-add some duplicate blocks. Hope nothing breaks.
+
+ for (HFileBlockPair block : blocks) {
+ try {
+ if (toBeTested.getBlock(block.blockName, true) != null) {
+ toBeTested.cacheBlock(block.blockName, block.block);
+ fail("Cache should not allow re-caching a block");
+ }
+ } catch (RuntimeException re) {
+ // expected
+ }
+ }
+
+ }
+
+ public static void hammerSingleKey(final BlockCache toBeTested,
+ int BlockSize, int numThreads, int numQueries) throws Exception {
+ final HFileBlockPair kv = generateHFileBlocks(BlockSize, 1)[0];
+ Configuration conf = new Configuration();
+ MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(
+ conf);
+
+ final AtomicInteger totalQueries = new AtomicInteger();
+ toBeTested.cacheBlock(kv.blockName, kv.block);
+
+ for (int i = 0; i < numThreads; i++) {
+ TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
+ @Override
+ public void doAnAction() throws Exception {
+ assertEquals(kv.block, toBeTested.getBlock(kv.blockName, false));
+ totalQueries.incrementAndGet();
+ }
+ };
+
+ ctx.addThread(t);
+ }
+
+ ctx.startThreads();
+ while (totalQueries.get() < numQueries && ctx.shouldRun()) {
+ Thread.sleep(10);
+ }
+ ctx.stop();
+ }
+
+ private static HFileBlockPair[] generateHFileBlocks(int blockSize,
+ int numBlocks) {
+ HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
+ Random rand = new Random();
+ HashSet<String> usedStrings = new HashSet<String>();
+ for (int i = 0; i < numBlocks; i++) {
+
+ // The buffer serialized size needs to match the size of BlockSize. So we
+ // declare our data size to be smaller than it by the serialization space
+ // required.
+
+ ByteBuffer cachedBuffer = ByteBuffer.allocate(blockSize
+ - HFileBlock.EXTRA_SERIALIZATION_SPACE);
+ rand.nextBytes(cachedBuffer.array());
+ cachedBuffer.rewind();
+ int onDiskSizeWithoutHeader = blockSize
+ - HFileBlock.EXTRA_SERIALIZATION_SPACE;
+ int uncompressedSizeWithoutHeader = blockSize
+ - HFileBlock.EXTRA_SERIALIZATION_SPACE;
+ long prevBlockOffset = rand.nextLong();
+ BlockType.DATA.write(cachedBuffer);
+ cachedBuffer.putInt(onDiskSizeWithoutHeader);
+ cachedBuffer.putInt(uncompressedSizeWithoutHeader);
+ cachedBuffer.putLong(prevBlockOffset);
+ cachedBuffer.rewind();
+
+ HFileBlock generated = new HFileBlock(BlockType.DATA,
+ onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader,
+ prevBlockOffset, cachedBuffer, false, blockSize);
+
+ String strKey;
+ /* No conflicting keys */
+ for (strKey = new Long(rand.nextLong()).toString(); !usedStrings
+ .add(strKey); strKey = new Long(rand.nextLong()).toString())
+ ;
+
+ returnedBlocks[i] = new HFileBlockPair();
+ returnedBlocks[i].blockName = strKey;
+ returnedBlocks[i].block = generated;
+ }
+ return returnedBlocks;
+ }
+
+ private static class HFileBlockPair {
+ String blockName;
+ HFileBlock block;
+ }
+}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java?rev=1162207&r1=1162206&r2=1162207&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCachedBlockQueue.java Fri Aug 26 18:53:00 2011
@@ -21,10 +21,8 @@ package org.apache.hadoop.hbase.io.hfile
import java.nio.ByteBuffer;
-import org.apache.hadoop.hbase.io.HeapSize;
import java.util.LinkedList;
-
import junit.framework.TestCase;
public class TestCachedBlockQueue extends TestCase {
@@ -132,10 +130,26 @@ public class TestCachedBlockQueue extend
{
public CachedBlock(final long heapSize, String name, long accessTime) {
super(name,
- new HeapSize(){
+ new Cacheable(){
@Override
public long heapSize() {
return ((int)(heapSize - CachedBlock.PER_BLOCK_OVERHEAD));
+ }
+
+ @Override
+ public int getSerializedLength() {
+ return 0;
+ }
+
+ @Override
+ public void serialize(ByteBuffer destination) {
+ }
+
+
+ @Override
+ public CacheableDeserializer<Cacheable> getDeserializer() {
+ // TODO Auto-generated method stub
+ return null;
}},
accessTime,false);
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java?rev=1162207&r1=1162206&r2=1162207&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLruBlockCache.java Fri Aug 26 18:53:00 2011
@@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import java.nio.ByteBuffer;
import java.util.Random;
import org.apache.hadoop.hbase.io.HeapSize;
@@ -510,7 +511,7 @@ public class TestLruBlockCache extends T
LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
}
- private static class CachedItem implements HeapSize {
+ private static class CachedItem implements Cacheable {
String blockName;
int size;
@@ -531,5 +532,20 @@ public class TestLruBlockCache extends T
+ ClassSize.align(blockName.length())
+ ClassSize.align(size);
}
+
+ @Override
+ public int getSerializedLength() {
+ return 0;
+ }
+
+ @Override
+ public CacheableDeserializer<Cacheable> getDeserializer() {
+ return null;
+ }
+
+ @Override
+ public void serialize(ByteBuffer destination) {
+ }
+
}
}