You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jm...@apache.org on 2015/05/01 17:28:19 UTC
[33/50] [abbrv] hbase git commit: HBASE-13301 Possible memory leak in
BucketCache
HBASE-13301 Possible memory leak in BucketCache
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4f151444
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4f151444
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4f151444
Branch: refs/heads/hbase-11339
Commit: 4f151444b58ae85b93f76254961358932e0ffb9b
Parents: 71536bd
Author: zhangduo <zh...@wandoujia.com>
Authored: Sat Apr 11 10:43:43 2015 +0800
Committer: zhangduo <zh...@wandoujia.com>
Committed: Tue Apr 14 17:41:46 2015 +0800
----------------------------------------------------------------------
.../hbase/io/hfile/bucket/BucketCache.java | 182 +++++++++++--------
.../hbase/io/hfile/bucket/CachedEntryQueue.java | 20 +-
.../org/apache/hadoop/hbase/util/IdLock.java | 16 ++
.../hadoop/hbase/io/hfile/CacheTestUtils.java | 6 +-
.../hbase/io/hfile/bucket/TestBucketCache.java | 87 ++++++---
5 files changed, 196 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/4f151444/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 7dda0e6..6a5c884 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -39,6 +39,7 @@ import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -109,13 +110,14 @@ public class BucketCache implements BlockCache, HeapSize {
final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
// Store/read block data
- IOEngine ioEngine;
+ final IOEngine ioEngine;
// Store the block in this map before writing it to cache
@VisibleForTesting
- Map<BlockCacheKey, RAMQueueEntry> ramCache;
+ final ConcurrentMap<BlockCacheKey, RAMQueueEntry> ramCache;
// In this map, store the block's meta data like offset, length
- private Map<BlockCacheKey, BucketEntry> backingMap;
+ @VisibleForTesting
+ ConcurrentMap<BlockCacheKey, BucketEntry> backingMap;
/**
* Flag if the cache is enabled or not... We shut it off if there are IO
@@ -132,14 +134,14 @@ public class BucketCache implements BlockCache, HeapSize {
* to the BucketCache. It then updates the ramCache and backingMap accordingly.
*/
@VisibleForTesting
- ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues =
+ final ArrayList<BlockingQueue<RAMQueueEntry>> writerQueues =
new ArrayList<BlockingQueue<RAMQueueEntry>>();
@VisibleForTesting
- WriterThread writerThreads[];
+ final WriterThread[] writerThreads;
/** Volatile boolean to track if free space is in process or not */
private volatile boolean freeInProgress = false;
- private Lock freeSpaceLock = new ReentrantLock();
+ private final Lock freeSpaceLock = new ReentrantLock();
private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<Integer>();
@@ -152,17 +154,16 @@ public class BucketCache implements BlockCache, HeapSize {
/** Cache access count (sequential ID) */
private final AtomicLong accessCount = new AtomicLong(0);
- private final Object[] cacheWaitSignals;
private static final int DEFAULT_CACHE_WAIT_TIME = 50;
// Used in test now. If the flag is false and the cache speed is very fast,
// bucket cache will skip some blocks when caching. If the flag is true, we
// will wait blocks flushed to IOEngine for some time when caching
boolean wait_when_cache = false;
- private BucketCacheStats cacheStats = new BucketCacheStats();
+ private final BucketCacheStats cacheStats = new BucketCacheStats();
- private String persistencePath;
- private long cacheCapacity;
+ private final String persistencePath;
+ private final long cacheCapacity;
/** Approximate block size */
private final long blockSize;
@@ -182,7 +183,8 @@ public class BucketCache implements BlockCache, HeapSize {
*
* TODO:We could extend the IdLock to IdReadWriteLock for better.
*/
- private IdLock offsetLock = new IdLock();
+ @VisibleForTesting
+ final IdLock offsetLock = new IdLock();
private final ConcurrentIndex<String, BlockCacheKey> blocksByHFile =
new ConcurrentIndex<String, BlockCacheKey>(new Comparator<BlockCacheKey>() {
@@ -216,7 +218,6 @@ public class BucketCache implements BlockCache, HeapSize {
throws FileNotFoundException, IOException {
this.ioEngine = getIOEngineFromName(ioEngineName, capacity);
this.writerThreads = new WriterThread[writerThreadNum];
- this.cacheWaitSignals = new Object[writerThreadNum];
long blockNumCapacity = capacity / blockSize;
if (blockNumCapacity >= Integer.MAX_VALUE) {
// Enough for about 32TB of cache!
@@ -231,7 +232,6 @@ public class BucketCache implements BlockCache, HeapSize {
bucketAllocator = new BucketAllocator(capacity, bucketSizes);
for (int i = 0; i < writerThreads.length; ++i) {
writerQueues.add(new ArrayBlockingQueue<RAMQueueEntry>(writerQLen));
- this.cacheWaitSignals[i] = new Object();
}
assert writerQueues.size() == writerThreads.length;
@@ -252,7 +252,7 @@ public class BucketCache implements BlockCache, HeapSize {
final String threadName = Thread.currentThread().getName();
this.cacheEnabled = true;
for (int i = 0; i < writerThreads.length; ++i) {
- writerThreads[i] = new WriterThread(writerQueues.get(i), i);
+ writerThreads[i] = new WriterThread(writerQueues.get(i));
writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
writerThreads[i].setDaemon(true);
}
@@ -344,38 +344,39 @@ public class BucketCache implements BlockCache, HeapSize {
* @param inMemory if block is in-memory
* @param wait if true, blocking wait when queue is full
*/
- public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem,
- boolean inMemory, boolean wait) {
- if (!cacheEnabled)
+ public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
+ boolean wait) {
+ if (!cacheEnabled) {
return;
+ }
- if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey))
+ if (backingMap.containsKey(cacheKey)) {
return;
+ }
/*
- * Stuff the entry into the RAM cache so it can get drained to the
- * persistent store
+ * Stuff the entry into the RAM cache so it can get drained to the persistent store
*/
- RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem,
- accessCount.incrementAndGet(), inMemory);
- ramCache.put(cacheKey, re);
+ RAMQueueEntry re =
+ new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
+ if (ramCache.putIfAbsent(cacheKey, re) != null) {
+ return;
+ }
int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
- boolean successfulAddition = bq.offer(re);
- if (!successfulAddition && wait) {
- synchronized (cacheWaitSignals[queueNum]) {
- try {
- successfulAddition = bq.offer(re);
- if (!successfulAddition) cacheWaitSignals[queueNum].wait(DEFAULT_CACHE_WAIT_TIME);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
+ boolean successfulAddition = false;
+ if (wait) {
+ try {
+ successfulAddition = bq.offer(re, DEFAULT_CACHE_WAIT_TIME, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
+ } else {
successfulAddition = bq.offer(re);
}
if (!successfulAddition) {
- ramCache.remove(cacheKey);
- failedBlockAdditions.incrementAndGet();
+ ramCache.remove(cacheKey);
+ failedBlockAdditions.incrementAndGet();
} else {
this.blockNumber.incrementAndGet();
this.heapSize.addAndGet(cachedItem.heapSize());
@@ -394,11 +395,14 @@ public class BucketCache implements BlockCache, HeapSize {
@Override
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
- if (!cacheEnabled)
+ if (!cacheEnabled) {
return null;
+ }
RAMQueueEntry re = ramCache.get(key);
if (re != null) {
- if (updateCacheMetrics) cacheStats.hit(caching);
+ if (updateCacheMetrics) {
+ cacheStats.hit(caching);
+ }
re.access(accessCount.incrementAndGet());
return re.getData();
}
@@ -408,6 +412,9 @@ public class BucketCache implements BlockCache, HeapSize {
IdLock.Entry lockEntry = null;
try {
lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
+ // We can not read here even if backingMap does contain the given key because its offset
+ // maybe changed. If we lock BlockCacheKey instead of offset, then we can only check
+ // existence here.
if (bucketEntry.equals(backingMap.get(key))) {
int len = bucketEntry.getLength();
ByteBuffer bb = ByteBuffer.allocate(len);
@@ -438,13 +445,27 @@ public class BucketCache implements BlockCache, HeapSize {
}
}
}
- if (!repeat && updateCacheMetrics) cacheStats.miss(caching);
+ if (!repeat && updateCacheMetrics) {
+ cacheStats.miss(caching);
+ }
return null;
}
+ @VisibleForTesting
+ void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
+ bucketAllocator.freeBlock(bucketEntry.offset());
+ realCacheSize.addAndGet(-1 * bucketEntry.getLength());
+ blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
+ if (decrementBlockNumber) {
+ this.blockNumber.decrementAndGet();
+ }
+ }
+
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
- if (!cacheEnabled) return false;
+ if (!cacheEnabled) {
+ return false;
+ }
RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
if (removedBlock != null) {
this.blockNumber.decrementAndGet();
@@ -462,13 +483,8 @@ public class BucketCache implements BlockCache, HeapSize {
IdLock.Entry lockEntry = null;
try {
lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
- if (bucketEntry.equals(backingMap.remove(cacheKey))) {
- bucketAllocator.freeBlock(bucketEntry.offset());
- realCacheSize.addAndGet(-1 * bucketEntry.getLength());
- blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
- if (removedBlock == null) {
- this.blockNumber.decrementAndGet();
- }
+ if (backingMap.remove(cacheKey, bucketEntry)) {
+ blockEvicted(cacheKey, bucketEntry, removedBlock == null);
} else {
return false;
}
@@ -705,13 +721,10 @@ public class BucketCache implements BlockCache, HeapSize {
@VisibleForTesting
class WriterThread extends HasThread {
private final BlockingQueue<RAMQueueEntry> inputQueue;
- private final int threadNO;
private volatile boolean writerEnabled = true;
- WriterThread(BlockingQueue<RAMQueueEntry> queue, int threadNO) {
- super();
+ WriterThread(BlockingQueue<RAMQueueEntry> queue) {
this.inputQueue = queue;
- this.threadNO = threadNO;
}
// Used for test
@@ -728,9 +741,6 @@ public class BucketCache implements BlockCache, HeapSize {
try {
// Blocks
entries = getRAMQueueEntries(inputQueue, entries);
- synchronized (cacheWaitSignals[threadNO]) {
- cacheWaitSignals[threadNO].notifyAll();
- }
} catch (InterruptedException ie) {
if (!cacheEnabled) break;
}
@@ -755,7 +765,9 @@ public class BucketCache implements BlockCache, HeapSize {
*/
@VisibleForTesting
void doDrain(final List<RAMQueueEntry> entries) throws InterruptedException {
- if (entries.isEmpty()) return;
+ if (entries.isEmpty()) {
+ return;
+ }
// This method is a little hard to follow. We run through the passed in entries and for each
// successful add, we add a non-null BucketEntry to the below bucketEntries. Later we must
// do cleanup making sure we've cleared ramCache of all entries regardless of whether we
@@ -830,6 +842,21 @@ public class BucketCache implements BlockCache, HeapSize {
RAMQueueEntry ramCacheEntry = ramCache.remove(key);
if (ramCacheEntry != null) {
heapSize.addAndGet(-1 * entries.get(i).getData().heapSize());
+ } else if (bucketEntries[i] != null){
+ // Block should have already been evicted. Remove it and free space.
+ IdLock.Entry lockEntry = null;
+ try {
+ lockEntry = offsetLock.getLockEntry(bucketEntries[i].offset());
+ if (backingMap.remove(key, bucketEntries[i])) {
+ blockEvicted(key, bucketEntries[i], false);
+ }
+ } catch (IOException e) {
+ LOG.warn("failed to free space for " + key, e);
+ } finally {
+ if (lockEntry != null) {
+ offsetLock.releaseLockEntry(lockEntry);
+ }
+ }
}
}
@@ -1055,23 +1082,35 @@ public class BucketCache implements BlockCache, HeapSize {
* up the long. Doubt we'll see devices this big for ages. Offsets are divided
* by 256. So 5 bytes gives us 256TB or so.
*/
- static class BucketEntry implements Serializable, Comparable<BucketEntry> {
+ static class BucketEntry implements Serializable {
private static final long serialVersionUID = -6741504807982257534L;
+
+ // access counter comparator, descending order
+ static final Comparator<BucketEntry> COMPARATOR = new Comparator<BucketCache.BucketEntry>() {
+
+ @Override
+ public int compare(BucketEntry o1, BucketEntry o2) {
+ long accessCounter1 = o1.accessCounter;
+ long accessCounter2 = o2.accessCounter;
+ return accessCounter1 < accessCounter2 ? 1 : accessCounter1 == accessCounter2 ? 0 : -1;
+ }
+ };
+
private int offsetBase;
private int length;
private byte offset1;
byte deserialiserIndex;
- private volatile long accessTime;
+ private volatile long accessCounter;
private BlockPriority priority;
/**
* Time this block was cached. Presumes we are created just before we are added to the cache.
*/
private final long cachedTime = System.nanoTime();
- BucketEntry(long offset, int length, long accessTime, boolean inMemory) {
+ BucketEntry(long offset, int length, long accessCounter, boolean inMemory) {
setOffset(offset);
this.length = length;
- this.accessTime = accessTime;
+ this.accessCounter = accessCounter;
if (inMemory) {
this.priority = BlockPriority.MEMORY;
} else {
@@ -1110,10 +1149,10 @@ public class BucketCache implements BlockCache, HeapSize {
}
/**
- * Block has been accessed. Update its local access time.
+ * Block has been accessed. Update its local access counter.
*/
- public void access(long accessTime) {
- this.accessTime = accessTime;
+ public void access(long accessCounter) {
+ this.accessCounter = accessCounter;
if (this.priority == BlockPriority.SINGLE) {
this.priority = BlockPriority.MULTI;
}
@@ -1123,17 +1162,6 @@ public class BucketCache implements BlockCache, HeapSize {
return this.priority;
}
- @Override
- public int compareTo(BucketEntry that) {
- if(this.accessTime == that.accessTime) return 0;
- return this.accessTime < that.accessTime ? 1 : -1;
- }
-
- @Override
- public boolean equals(Object that) {
- return this == that;
- }
-
public long getCachedTime() {
return cachedTime;
}
@@ -1204,14 +1232,14 @@ public class BucketCache implements BlockCache, HeapSize {
static class RAMQueueEntry {
private BlockCacheKey key;
private Cacheable data;
- private long accessTime;
+ private long accessCounter;
private boolean inMemory;
- public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessTime,
+ public RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter,
boolean inMemory) {
this.key = bck;
this.data = data;
- this.accessTime = accessTime;
+ this.accessCounter = accessCounter;
this.inMemory = inMemory;
}
@@ -1223,8 +1251,8 @@ public class BucketCache implements BlockCache, HeapSize {
return key;
}
- public void access(long accessTime) {
- this.accessTime = accessTime;
+ public void access(long accessCounter) {
+ this.accessCounter = accessCounter;
}
public BucketEntry writeToCache(final IOEngine ioEngine,
@@ -1236,7 +1264,7 @@ public class BucketCache implements BlockCache, HeapSize {
// This cacheable thing can't be serialized...
if (len == 0) return null;
long offset = bucketAllocator.allocateBlock(len);
- BucketEntry bucketEntry = new BucketEntry(offset, len, accessTime, inMemory);
+ BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory);
bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap);
try {
if (data instanceof HFileBlock) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/4f151444/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java
index b6954bb..0e33a56 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/CachedEntryQueue.java
@@ -54,23 +54,23 @@ public class CachedEntryQueue {
*/
public CachedEntryQueue(long maxSize, long blockSize) {
int initialSize = (int) (maxSize / blockSize);
- if (initialSize == 0)
+ if (initialSize == 0) {
initialSize++;
- queue = MinMaxPriorityQueue
- .orderedBy(new Comparator<Map.Entry<BlockCacheKey, BucketEntry>>() {
- public int compare(Entry<BlockCacheKey, BucketEntry> entry1,
- Entry<BlockCacheKey, BucketEntry> entry2) {
- return entry1.getValue().compareTo(entry2.getValue());
- }
+ }
+ queue = MinMaxPriorityQueue.orderedBy(new Comparator<Map.Entry<BlockCacheKey, BucketEntry>>() {
+
+ public int compare(Entry<BlockCacheKey, BucketEntry> entry1,
+ Entry<BlockCacheKey, BucketEntry> entry2) {
+ return BucketEntry.COMPARATOR.compare(entry1.getValue(), entry2.getValue());
+ }
- }).expectedSize(initialSize).create();
+ }).expectedSize(initialSize).create();
cacheSize = 0;
this.maxSize = maxSize;
}
/**
* Attempt to add the specified entry to this queue.
- *
* <p>
* If the queue is smaller than the max size, or if the specified element is
* ordered after the smallest element in the queue, the element will be added
@@ -83,7 +83,7 @@ public class CachedEntryQueue {
cacheSize += entry.getValue().getLength();
} else {
BucketEntry head = queue.peek().getValue();
- if (entry.getValue().compareTo(head) > 0) {
+ if (BucketEntry.COMPARATOR.compare(entry.getValue(), head) > 0) {
cacheSize += entry.getValue().getLength();
cacheSize -= head.getLength();
if (cacheSize > maxSize) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/4f151444/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
index b9d0983..fedf951 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java
@@ -25,6 +25,8 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Allows multiple concurrent clients to lock on a numeric id with a minimal
* memory overhead. The intended usage is as follows:
@@ -119,4 +121,18 @@ public class IdLock {
assert map.size() == 0;
}
+ @VisibleForTesting
+ public void waitForWaiters(long id, int numWaiters) throws InterruptedException {
+ for (Entry entry;;) {
+ entry = map.get(id);
+ if (entry != null) {
+ synchronized (entry) {
+ if (entry.numWaiters >= numWaiters) {
+ return;
+ }
+ }
+ }
+ Thread.sleep(100);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4f151444/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
index 5ef8cf0..b0a2ba2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
@@ -247,11 +247,11 @@ public class CacheTestUtils {
assertTrue(toBeTested.getStats().getEvictedCount() > 0);
}
- private static class ByteArrayCacheable implements Cacheable {
+ public static class ByteArrayCacheable implements Cacheable {
- static final CacheableDeserializer<Cacheable> blockDeserializer =
+ static final CacheableDeserializer<Cacheable> blockDeserializer =
new CacheableDeserializer<Cacheable>() {
-
+
@Override
public Cacheable deserialize(ByteBuffer b) throws IOException {
int len = b.getInt();
http://git-wip-us.apache.org/repos/asf/hbase/blob/4f151444/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index d29be01..99f5657 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -27,13 +28,14 @@ import java.util.Arrays;
import java.util.List;
import java.util.Random;
-import org.apache.hadoop.hbase.testclassification.IOTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.IdLock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -44,24 +46,23 @@ import org.junit.runners.Parameterized;
/**
* Basic test of BucketCache.Puts and gets.
* <p>
- * Tests will ensure that blocks' data correctness under several threads
- * concurrency
+ * Tests will ensure that blocks' data correctness under several threads concurrency
*/
@RunWith(Parameterized.class)
-@Category({IOTests.class, SmallTests.class})
+@Category({ IOTests.class, SmallTests.class })
public class TestBucketCache {
private static final Random RAND = new Random();
- @Parameterized.Parameters(name="{index}: blockSize={0}, bucketSizes={1}")
+ @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
- { 8192, null }, // TODO: why is 8k the default blocksize for these tests?
- { 16 * 1024, new int[] {
- 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
- 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
- 128 * 1024 + 1024 } }
- });
+ { 8192, null }, // TODO: why is 8k the default blocksize for these tests?
+ {
+ 16 * 1024,
+ new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024,
+ 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024,
+ 128 * 1024 + 1024 } } });
}
@Parameterized.Parameter(0)
@@ -76,7 +77,7 @@ public class TestBucketCache {
final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS;
final int NUM_THREADS = 1000;
final int NUM_QUERIES = 10000;
-
+
final long capacitySize = 32 * 1024 * 1024;
final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
@@ -86,16 +87,16 @@ public class TestBucketCache {
private class MockedBucketCache extends BucketCache {
public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes,
- int writerThreads, int writerQLen, String persistencePath)
- throws FileNotFoundException, IOException {
+ int writerThreads, int writerQLen, String persistencePath) throws FileNotFoundException,
+ IOException {
super(ioEngineName, capacity, blockSize, bucketSizes, writerThreads, writerQLen,
- persistencePath);
+ persistencePath);
super.wait_when_cache = true;
}
@Override
- public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf,
- boolean inMemory, boolean cacheDataInL1) {
+ public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory,
+ boolean cacheDataInL1) {
if (super.getBlock(cacheKey, true, false, true) != null) {
throw new RuntimeException("Cached an already cached block");
}
@@ -113,8 +114,9 @@ public class TestBucketCache {
@Before
public void setup() throws FileNotFoundException, IOException {
- cache = new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, persistencePath);
+ cache =
+ new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize,
+ constructedBlockSizes, writeThreads, writerQLen, persistencePath);
}
@After
@@ -142,7 +144,7 @@ public class TestBucketCache {
// Fill the allocated extents by choosing a random blocksize. Continues selecting blocks until
// the cache is completely filled.
List<Integer> tmp = new ArrayList<Integer>(BLOCKSIZES);
- for (int i = 0; !full; i++) {
+ while (!full) {
Integer blockSize = null;
try {
blockSize = randFrom(tmp);
@@ -156,9 +158,7 @@ public class TestBucketCache {
for (Integer blockSize : BLOCKSIZES) {
BucketSizeInfo bucketSizeInfo = mAllocator.roundUpToBucketSizeInfo(blockSize);
IndexStatistics indexStatistics = bucketSizeInfo.statistics();
- assertEquals(
- "unexpected freeCount for " + bucketSizeInfo,
- 0, indexStatistics.freeCount());
+ assertEquals("unexpected freeCount for " + bucketSizeInfo, 0, indexStatistics.freeCount());
}
for (long offset : allocations) {
@@ -182,4 +182,41 @@ public class TestBucketCache {
cache.stopWriterThreads();
CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
}
-}
\ No newline at end of file
+
+ // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
+ // threads will flush it to the bucket and put reference entry in backingMap.
+ private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
+ Cacheable block) throws InterruptedException {
+ cache.cacheBlock(cacheKey, block);
+ while (!cache.backingMap.containsKey(cacheKey)) {
+ Thread.sleep(100);
+ }
+ }
+
+ @Test
+ public void testMemoryLeak() throws Exception {
+ final BlockCacheKey cacheKey = new BlockCacheKey("dummy", 1L);
+ cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable(
+ new byte[10]));
+ long lockId = cache.backingMap.get(cacheKey).offset();
+ IdLock.Entry lockEntry = cache.offsetLock.getLockEntry(lockId);
+ Thread evictThread = new Thread("evict-block") {
+
+ @Override
+ public void run() {
+ cache.evictBlock(cacheKey);
+ }
+
+ };
+ evictThread.start();
+ cache.offsetLock.waitForWaiters(lockId, 1);
+ cache.blockEvicted(cacheKey, cache.backingMap.remove(cacheKey), true);
+ cacheAndWaitUntilFlushedToBucket(cache, cacheKey, new CacheTestUtils.ByteArrayCacheable(
+ new byte[10]));
+ cache.offsetLock.releaseLockEntry(lockEntry);
+ evictThread.join();
+ assertEquals(1L, cache.getBlockCount());
+ assertTrue(cache.getCurrentSize() > 0L);
+ assertTrue("We should have a block!", cache.iterator().hasNext());
+ }
+}