You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2017/09/06 15:49:03 UTC
hbase git commit: HBASE-18375: Fix the bug where the pool chunks from
ChunkCreator are deallocated and not returned to pool,
because there is no reference to them
Repository: hbase
Updated Branches:
refs/heads/branch-2 d7a74a75a -> 68ec2a9da
HBASE-18375: Fix the bug where the pool chunks from ChunkCreator are deallocated and not returned to pool, because there is no reference to them
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/68ec2a9d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/68ec2a9d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/68ec2a9d
Branch: refs/heads/branch-2
Commit: 68ec2a9da022f7824e9a45ef89a0c4b8bcb838f3
Parents: d7a74a7
Author: anastas <an...@yahoo-inc.com>
Authored: Wed Sep 6 18:48:53 2017 +0300
Committer: anastas <an...@yahoo-inc.com>
Committed: Wed Sep 6 18:48:53 2017 +0300
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/ChunkCreator.java | 127 ++++++++++++-------
.../hbase/regionserver/CompactingMemStore.java | 24 +++-
.../hbase/regionserver/CompactionPipeline.java | 4 +-
.../hbase/regionserver/MemStoreLABImpl.java | 10 ++
.../hbase/regionserver/TestMemStoreLAB.java | 12 +-
.../TestMemstoreLABWithoutPool.java | 3 +-
6 files changed, 124 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
index 38d7136..61cf2b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
@@ -50,10 +50,9 @@ public class ChunkCreator {
// monotonically increasing chunkid
private AtomicInteger chunkID = new AtomicInteger(1);
// maps the chunk against the monotonically increasing chunk id. We need to preserve the
- // natural ordering of the key
- // CellChunkMap creation should convert the soft ref to hard reference
- private Map<Integer, SoftReference<Chunk>> chunkIdMap =
- new ConcurrentHashMap<Integer, SoftReference<Chunk>>();
+ // natural ordering of the key. It also helps to protect from GC.
+ private Map<Integer, Chunk> chunkIdMap = new ConcurrentHashMap<Integer, Chunk>();
+
private final int chunkSize;
private final boolean offheap;
@VisibleForTesting
@@ -75,7 +74,7 @@ public class ChunkCreator {
}
/**
- * Initializes the instance of MSLABChunkCreator
+ * Initializes the instance of ChunkCreator
* @param chunkSize the chunkSize
* @param offheap indicates if the chunk is to be created offheap or not
* @param globalMemStoreSize the global memstore size
@@ -100,10 +99,19 @@ public class ChunkCreator {
}
/**
- * Creates and inits a chunk.
+ * Creates and inits a chunk. The default implementation.
* @return the chunk that was initialized
*/
Chunk getChunk() {
+ return getChunk(CompactingMemStore.IndexType.ARRAY_MAP);
+ }
+
+ /**
+ * Creates and inits a chunk.
+ * @return the chunk that was initialized
+ * @param chunkIndexType whether the requested chunk is going to be used with CellChunkMap index
+ */
+ Chunk getChunk(CompactingMemStore.IndexType chunkIndexType) {
Chunk chunk = null;
if (pool != null) {
// the pool creates the chunk internally. The chunk#init() call happens here
@@ -117,44 +125,49 @@ public class ChunkCreator {
}
}
if (chunk == null) {
- chunk = createChunk();
+ // the second boolean parameter means:
+ // if CellChunkMap index is requested, put allocated on demand chunk mapping into chunkIdMap
+ chunk = createChunk(false, chunkIndexType);
}
- // put this chunk into the chunkIdMap
- this.chunkIdMap.put(chunk.getId(), new SoftReference<>(chunk));
+
// now we need to actually do the expensive memory allocation step in case of a new chunk,
// else only the offset is set to the beginning of the chunk to accept allocations
chunk.init();
return chunk;
}
- private Chunk createChunk() {
- return createChunk(false);
+ private Chunk createChunkForPool() {
+ return createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP);
}
/**
* Creates the chunk either onheap or offheap
* @param pool indicates if the chunks have to be created which will be used by the Pool
+ * @param chunkIndexType
* @return the chunk
*/
- private Chunk createChunk(boolean pool) {
+ private Chunk createChunk(boolean pool, CompactingMemStore.IndexType chunkIndexType) {
+ Chunk chunk = null;
int id = chunkID.getAndIncrement();
assert id > 0;
// do not create offheap chunk on demand
if (pool && this.offheap) {
- return new OffheapChunk(chunkSize, id, pool);
+ chunk = new OffheapChunk(chunkSize, id, pool);
} else {
- return new OnheapChunk(chunkSize, id, pool);
+ chunk = new OnheapChunk(chunkSize, id, pool);
}
+ if (pool || (chunkIndexType == CompactingMemStore.IndexType.CHUNK_MAP)) {
+ // put the pool chunk into the chunkIdMap so it is not GC-ed
+ this.chunkIdMap.put(chunk.getId(), chunk);
+ }
+ return chunk;
}
@VisibleForTesting
- // TODO : To be used by CellChunkMap
+ // Used to translate the ChunkID into a chunk ref
Chunk getChunk(int id) {
- SoftReference<Chunk> ref = chunkIdMap.get(id);
- if (ref != null) {
- return ref.get();
- }
- return null;
+ // can return null if chunk was never mapped
+ return chunkIdMap.get(id);
}
int getChunkSize() {
@@ -170,15 +183,13 @@ public class ChunkCreator {
}
Chunk removeChunk(int chunkId) {
- SoftReference<Chunk> ref = this.chunkIdMap.remove(chunkId);
- if (ref != null) {
- return ref.get();
- }
- return null;
+ return this.chunkIdMap.remove(chunkId);
}
@VisibleForTesting
- int size() {
+ // the chunks in the chunkIdMap may already be released so we shouldn't relay
+ // on this counting for strong correctness. This method is used only in testing.
+ int numberOfMappedChunks() {
return this.chunkIdMap.size();
}
@@ -213,7 +224,8 @@ public class ChunkCreator {
this.poolSizePercentage = poolSizePercentage;
this.reclaimedChunks = new LinkedBlockingQueue<>();
for (int i = 0; i < initialCount; i++) {
- Chunk chunk = createChunk(true);
+ // Chunks from pool are covered with strong references anyway
+ Chunk chunk = createChunkForPool();
chunk.init();
reclaimedChunks.add(chunk);
}
@@ -232,7 +244,7 @@ public class ChunkCreator {
* then.
* Note: Chunks returned by this pool must be put back to the pool after its use.
* @return a chunk
- * @see #putbackChunks(Set)
+ * @see #putbackChunks(Chunk)
*/
Chunk getChunk() {
Chunk chunk = reclaimedChunks.poll();
@@ -245,7 +257,7 @@ public class ChunkCreator {
long created = this.chunkCount.get();
if (created < this.maxCount) {
if (this.chunkCount.compareAndSet(created, created + 1)) {
- chunk = createChunk(true);
+ chunk = createChunkForPool();
break;
}
} else {
@@ -259,21 +271,16 @@ public class ChunkCreator {
/**
* Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining
* chunks
- * @param chunks
+ * @param c
*/
- private void putbackChunks(Set<Integer> chunks) {
- int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size());
- Iterator<Integer> iterator = chunks.iterator();
- while (iterator.hasNext()) {
- Integer chunkId = iterator.next();
- // remove the chunks every time though they are from the pool or not
- Chunk chunk = ChunkCreator.this.removeChunk(chunkId);
- if (chunk != null) {
- if (chunk.isFromPool() && toAdd > 0) {
- reclaimedChunks.add(chunk);
- }
- toAdd--;
- }
+ private void putbackChunks(Chunk c) {
+ int toAdd = this.maxCount - reclaimedChunks.size();
+ if (c.isFromPool() && toAdd > 0) {
+ reclaimedChunks.add(c);
+ } else {
+ // remove the chunk (that is not going to pool)
+ // though it is initially from the pool or not
+ ChunkCreator.this.removeChunk(c.getId());
}
}
@@ -384,6 +391,20 @@ public class ChunkCreator {
return 0;
}
+ @VisibleForTesting
+ boolean isChunkInPool(int chunkId) {
+ if (pool != null) {
+ // chunks that are from pool will return true chunk reference not null
+ Chunk c = getChunk(chunkId);
+ if (c==null) {
+ return false;
+ }
+ return pool.reclaimedChunks.contains(c);
+ }
+
+ return false;
+ }
+
/*
* Only used in testing
*/
@@ -395,10 +416,24 @@ public class ChunkCreator {
}
synchronized void putbackChunks(Set<Integer> chunks) {
- if (pool != null) {
- pool.putbackChunks(chunks);
- } else {
+ // if there is no pool just try to clear the chunkIdMap in case there is something
+ if ( pool == null ) {
this.removeChunks(chunks);
+ return;
}
+
+ // if there is pool, go over all chunk IDs that came back, the chunks may be from pool or not
+ for (int chunkID : chunks) {
+ // translate chunk ID to chunk, if chunk initially wasn't in pool
+ // this translation will (most likely) return null
+ Chunk chunk = ChunkCreator.this.getChunk(chunkID);
+ if (chunk != null) {
+ pool.putbackChunks(chunk);
+ }
+ // if chunk is null, it was never covered by the chunkIdMap (and so wasn't in pool also),
+ // so we have nothing to do on its release
+ }
+ return;
}
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 4de78ca..d554d85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -57,6 +57,12 @@ public class CompactingMemStore extends AbstractMemStore {
"hbase.hregion.compacting.memstore.type";
public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT =
String.valueOf(MemoryCompactionPolicy.BASIC);
+ // The external setting of the compacting MemStore behaviour
+ public static final String COMPACTING_MEMSTORE_INDEX_KEY =
+ "hbase.hregion.compacting.memstore.index";
+ // usage of CellArrayMap is default, later it will be decided how to use CellChunkMap
+ public static final String COMPACTING_MEMSTORE_INDEX_DEFAULT =
+ String.valueOf(IndexType.ARRAY_MAP);
// Default fraction of in-memory-flush size w.r.t. flush-to-disk size
public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
"hbase.memstore.inmemoryflush.threshold.factor";
@@ -78,10 +84,22 @@ public class CompactingMemStore extends AbstractMemStore {
private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
private boolean compositeSnapshot = true;
+ /**
+ * Types of indexes (part of immutable segments) to be used after flattening,
+ * compaction, or merge are applied.
+ */
+ public enum IndexType {
+ CSLM_MAP, // ConcurrentSkipLisMap
+ ARRAY_MAP, // CellArrayMap
+ CHUNK_MAP // CellChunkMap
+ }
+
+ private IndexType indexType = IndexType.ARRAY_MAP; // default implementation
public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD
- + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
- // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction
+ + 7 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline,
+ // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction,
+ // indexType
+ Bytes.SIZEOF_LONG // inmemoryFlushSize
+ 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay
+ 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction
@@ -96,6 +114,8 @@ public class CompactingMemStore extends AbstractMemStore {
this.pipeline = new CompactionPipeline(getRegionServices());
this.compactor = createMemStoreCompactor(compactionPolicy);
initInmemoryFlushSize(conf);
+ indexType = IndexType.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
+ CompactingMemStore.COMPACTING_MEMSTORE_INDEX_DEFAULT));
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 06e83a3..97ea568 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -256,6 +256,8 @@ public class CompactionPipeline {
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
boolean closeSegmentsInSuffix) {
+ pipeline.removeAll(suffix);
+ if(segment != null) pipeline.addLast(segment);
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
// will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
// from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
@@ -267,8 +269,6 @@ public class CompactionPipeline {
itemInSuffix.close();
}
}
- pipeline.removeAll(suffix);
- if(segment != null) pipeline.addLast(segment);
}
public Segment getTail() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
index ba53348..112f69e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -269,4 +269,14 @@ public class MemStoreLABImpl implements MemStoreLAB {
}
return pooledChunks;
}
+
+ @VisibleForTesting Integer getNumOfChunksReturnedToPool() {
+ int i = 0;
+ for (Integer id : this.chunks) {
+ if (chunkCreator.isChunkInPool(id)) {
+ i++;
+ }
+ }
+ return i;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
index f171dd0..06b9c40 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
@@ -247,14 +247,16 @@ public class TestMemStoreLAB {
}
}
// none of the chunkIds would have been returned back
- assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() != 0);
+ assertTrue("All the chunks must have been cleared",
+ ChunkCreator.INSTANCE.numberOfMappedChunks() != 0);
+ int pooledChunksNum = mslab.getPooledChunks().size();
// close the mslab
mslab.close();
- // make sure all chunks reclaimed or removed from chunk queue
- int queueLength = mslab.getPooledChunks().size();
+ // make sure all chunks where reclaimed back to pool
+ int queueLength = mslab.getNumOfChunksReturnedToPool();
assertTrue("All chunks in chunk queue should be reclaimed or removed"
- + " after mslab closed but actually: " + queueLength,
- queueLength == 0);
+ + " after mslab closed but actually: " + (pooledChunksNum-queueLength),
+ pooledChunksNum-queueLength == 0);
} finally {
ChunkCreator.INSTANCE = oldInstance;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java
index 96be8ec..d3f9bc1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java
@@ -140,7 +140,8 @@ public class TestMemstoreLABWithoutPool {
mslab[i].close();
}
// all of the chunkIds would have been returned back
- assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() == 0);
+ assertTrue("All the chunks must have been cleared",
+ ChunkCreator.INSTANCE.numberOfMappedChunks() == 0);
}
private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName,