You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2017/09/06 15:49:03 UTC

hbase git commit: HBASE-18375: Fix the bug where the pool chunks from ChunkCreator are deallocated and not returned to pool, because there is no reference to them

Repository: hbase
Updated Branches:
  refs/heads/branch-2 d7a74a75a -> 68ec2a9da


HBASE-18375: Fix the bug where the pool chunks from ChunkCreator are deallocated and not returned to pool, because there is no reference to them


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/68ec2a9d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/68ec2a9d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/68ec2a9d

Branch: refs/heads/branch-2
Commit: 68ec2a9da022f7824e9a45ef89a0c4b8bcb838f3
Parents: d7a74a7
Author: anastas <an...@yahoo-inc.com>
Authored: Wed Sep 6 18:48:53 2017 +0300
Committer: anastas <an...@yahoo-inc.com>
Committed: Wed Sep 6 18:48:53 2017 +0300

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/ChunkCreator.java | 127 ++++++++++++-------
 .../hbase/regionserver/CompactingMemStore.java  |  24 +++-
 .../hbase/regionserver/CompactionPipeline.java  |   4 +-
 .../hbase/regionserver/MemStoreLABImpl.java     |  10 ++
 .../hbase/regionserver/TestMemStoreLAB.java     |  12 +-
 .../TestMemstoreLABWithoutPool.java             |   3 +-
 6 files changed, 124 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
index 38d7136..61cf2b3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java
@@ -50,10 +50,9 @@ public class ChunkCreator {
   // monotonically increasing chunkid
   private AtomicInteger chunkID = new AtomicInteger(1);
   // maps the chunk against the monotonically increasing chunk id. We need to preserve the
-  // natural ordering of the key
-  // CellChunkMap creation should convert the soft ref to hard reference
-  private Map<Integer, SoftReference<Chunk>> chunkIdMap =
-      new ConcurrentHashMap<Integer, SoftReference<Chunk>>();
+  // natural ordering of the key. It also helps to protect from GC.
+  private Map<Integer, Chunk> chunkIdMap = new ConcurrentHashMap<Integer, Chunk>();
+
   private final int chunkSize;
   private final boolean offheap;
   @VisibleForTesting
@@ -75,7 +74,7 @@ public class ChunkCreator {
   }
 
   /**
-   * Initializes the instance of MSLABChunkCreator
+   * Initializes the instance of ChunkCreator
    * @param chunkSize the chunkSize
    * @param offheap indicates if the chunk is to be created offheap or not
    * @param globalMemStoreSize  the global memstore size
@@ -100,10 +99,19 @@ public class ChunkCreator {
   }
 
   /**
-   * Creates and inits a chunk.
+   * Creates and inits a chunk. The default implementation.
    * @return the chunk that was initialized
    */
   Chunk getChunk() {
+    return getChunk(CompactingMemStore.IndexType.ARRAY_MAP);
+  }
+
+  /**
+   * Creates and inits a chunk.
+   * @return the chunk that was initialized
+   * @param chunkIndexType whether the requested chunk is going to be used with CellChunkMap index
+   */
+  Chunk getChunk(CompactingMemStore.IndexType chunkIndexType) {
     Chunk chunk = null;
     if (pool != null) {
       //  the pool creates the chunk internally. The chunk#init() call happens here
@@ -117,44 +125,49 @@ public class ChunkCreator {
       }
     }
     if (chunk == null) {
-      chunk = createChunk();
+      // the second boolean parameter means:
+      // if CellChunkMap index is requested, put allocated on demand chunk mapping into chunkIdMap
+      chunk = createChunk(false, chunkIndexType);
     }
-    // put this chunk into the chunkIdMap
-    this.chunkIdMap.put(chunk.getId(), new SoftReference<>(chunk));
+
     // now we need to actually do the expensive memory allocation step in case of a new chunk,
     // else only the offset is set to the beginning of the chunk to accept allocations
     chunk.init();
     return chunk;
   }
 
-  private Chunk createChunk() {
-    return createChunk(false);
+  private Chunk createChunkForPool() {
+    return createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP);
   }
 
   /**
    * Creates the chunk either onheap or offheap
    * @param pool indicates if the chunks have to be created which will be used by the Pool
+   * @param chunkIndexType
    * @return the chunk
    */
-  private Chunk createChunk(boolean pool) {
+  private Chunk createChunk(boolean pool, CompactingMemStore.IndexType chunkIndexType) {
+    Chunk chunk = null;
     int id = chunkID.getAndIncrement();
     assert id > 0;
     // do not create offheap chunk on demand
     if (pool && this.offheap) {
-      return new OffheapChunk(chunkSize, id, pool);
+      chunk = new OffheapChunk(chunkSize, id, pool);
     } else {
-      return new OnheapChunk(chunkSize, id, pool);
+      chunk = new OnheapChunk(chunkSize, id, pool);
     }
+    if (pool || (chunkIndexType == CompactingMemStore.IndexType.CHUNK_MAP)) {
+      // put the pool chunk into the chunkIdMap so it is not GC-ed
+      this.chunkIdMap.put(chunk.getId(), chunk);
+    }
+    return chunk;
   }
 
   @VisibleForTesting
-  // TODO : To be used by CellChunkMap
+  // Used to translate the ChunkID into a chunk ref
   Chunk getChunk(int id) {
-    SoftReference<Chunk> ref = chunkIdMap.get(id);
-    if (ref != null) {
-      return ref.get();
-    }
-    return null;
+    // can return null if chunk was never mapped
+    return chunkIdMap.get(id);
   }
 
   int getChunkSize() {
@@ -170,15 +183,13 @@ public class ChunkCreator {
   }
 
   Chunk removeChunk(int chunkId) {
-    SoftReference<Chunk> ref = this.chunkIdMap.remove(chunkId);
-    if (ref != null) {
-      return ref.get();
-    }
-    return null;
+    return this.chunkIdMap.remove(chunkId);
   }
 
   @VisibleForTesting
-  int size() {
+  // the chunks in the chunkIdMap may already be released so we shouldn't relay
+  // on this counting for strong correctness. This method is used only in testing.
+  int numberOfMappedChunks() {
     return this.chunkIdMap.size();
   }
 
@@ -213,7 +224,8 @@ public class ChunkCreator {
       this.poolSizePercentage = poolSizePercentage;
       this.reclaimedChunks = new LinkedBlockingQueue<>();
       for (int i = 0; i < initialCount; i++) {
-        Chunk chunk = createChunk(true);
+        // Chunks from pool are covered with strong references anyway
+        Chunk chunk = createChunkForPool();
         chunk.init();
         reclaimedChunks.add(chunk);
       }
@@ -232,7 +244,7 @@ public class ChunkCreator {
      * then.
      * Note: Chunks returned by this pool must be put back to the pool after its use.
      * @return a chunk
-     * @see #putbackChunks(Set)
+     * @see #putbackChunks(Chunk)
      */
     Chunk getChunk() {
       Chunk chunk = reclaimedChunks.poll();
@@ -245,7 +257,7 @@ public class ChunkCreator {
           long created = this.chunkCount.get();
           if (created < this.maxCount) {
             if (this.chunkCount.compareAndSet(created, created + 1)) {
-              chunk = createChunk(true);
+              chunk = createChunkForPool();
               break;
             }
           } else {
@@ -259,21 +271,16 @@ public class ChunkCreator {
     /**
      * Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining
      * chunks
-     * @param chunks
+     * @param c
      */
-    private void putbackChunks(Set<Integer> chunks) {
-      int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size());
-      Iterator<Integer> iterator = chunks.iterator();
-      while (iterator.hasNext()) {
-        Integer chunkId = iterator.next();
-        // remove the chunks every time though they are from the pool or not
-        Chunk chunk = ChunkCreator.this.removeChunk(chunkId);
-        if (chunk != null) {
-          if (chunk.isFromPool() && toAdd > 0) {
-            reclaimedChunks.add(chunk);
-          }
-          toAdd--;
-        }
+    private void putbackChunks(Chunk c) {
+      int toAdd = this.maxCount - reclaimedChunks.size();
+      if (c.isFromPool() && toAdd > 0) {
+        reclaimedChunks.add(c);
+      } else {
+        // remove the chunk (that is not going to pool)
+        // though it is initially from the pool or not
+        ChunkCreator.this.removeChunk(c.getId());
       }
     }
 
@@ -384,6 +391,20 @@ public class ChunkCreator {
     return 0;
   }
 
+  @VisibleForTesting
+  boolean isChunkInPool(int chunkId) {
+    if (pool != null) {
+      // chunks that are from pool will return true chunk reference not null
+      Chunk c = getChunk(chunkId);
+      if (c==null) {
+        return false;
+      }
+      return pool.reclaimedChunks.contains(c);
+    }
+
+    return false;
+  }
+
   /*
    * Only used in testing
    */
@@ -395,10 +416,24 @@ public class ChunkCreator {
   }
 
   synchronized void putbackChunks(Set<Integer> chunks) {
-    if (pool != null) {
-      pool.putbackChunks(chunks);
-    } else {
+    // if there is no pool just try to clear the chunkIdMap in case there is something
+    if ( pool == null ) {
       this.removeChunks(chunks);
+      return;
     }
+
+    // if there is pool, go over all chunk IDs that came back, the chunks may be from pool or not
+    for (int chunkID : chunks) {
+      // translate chunk ID to chunk, if chunk initially wasn't in pool
+      // this translation will (most likely) return null
+      Chunk chunk = ChunkCreator.this.getChunk(chunkID);
+      if (chunk != null) {
+        pool.putbackChunks(chunk);
+      }
+      // if chunk is null, it was never covered by the chunkIdMap (and so wasn't in pool also),
+      // so we have nothing to do on its release
+    }
+    return;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
index 4de78ca..d554d85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java
@@ -57,6 +57,12 @@ public class CompactingMemStore extends AbstractMemStore {
       "hbase.hregion.compacting.memstore.type";
   public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT =
       String.valueOf(MemoryCompactionPolicy.BASIC);
+  // The external setting of the compacting MemStore behaviour
+  public static final String COMPACTING_MEMSTORE_INDEX_KEY =
+      "hbase.hregion.compacting.memstore.index";
+  // usage of CellArrayMap is default, later it will be decided how to use CellChunkMap
+  public static final String COMPACTING_MEMSTORE_INDEX_DEFAULT =
+      String.valueOf(IndexType.ARRAY_MAP);
   // Default fraction of in-memory-flush size w.r.t. flush-to-disk size
   public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
       "hbase.memstore.inmemoryflush.threshold.factor";
@@ -78,10 +84,22 @@ public class CompactingMemStore extends AbstractMemStore {
   private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
   private boolean compositeSnapshot = true;
 
+  /**
+   * Types of indexes (part of immutable segments) to be used after flattening,
+   * compaction, or merge are applied.
+   */
+  public enum IndexType {
+    CSLM_MAP,   // ConcurrentSkipLisMap
+    ARRAY_MAP,  // CellArrayMap
+    CHUNK_MAP   // CellChunkMap
+  }
+
+  private IndexType indexType = IndexType.ARRAY_MAP;  // default implementation
 
   public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD
-      + 6 * ClassSize.REFERENCE     // Store, RegionServicesForStores, CompactionPipeline,
-                                    // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction
+      + 7 * ClassSize.REFERENCE     // Store, RegionServicesForStores, CompactionPipeline,
+                                    // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction,
+                                    // indexType
       + Bytes.SIZEOF_LONG           // inmemoryFlushSize
       + 2 * Bytes.SIZEOF_BOOLEAN    // compositeSnapshot and inWalReplay
       + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction
@@ -96,6 +114,8 @@ public class CompactingMemStore extends AbstractMemStore {
     this.pipeline = new CompactionPipeline(getRegionServices());
     this.compactor = createMemStoreCompactor(compactionPolicy);
     initInmemoryFlushSize(conf);
+    indexType = IndexType.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY,
+        CompactingMemStore.COMPACTING_MEMSTORE_INDEX_DEFAULT));
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
index 06e83a3..97ea568 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java
@@ -256,6 +256,8 @@ public class CompactionPipeline {
 
   private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
       boolean closeSegmentsInSuffix) {
+    pipeline.removeAll(suffix);
+    if(segment != null) pipeline.addLast(segment);
     // During index merge we won't be closing the segments undergoing the merge. Segment#close()
     // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
     // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
@@ -267,8 +269,6 @@ public class CompactionPipeline {
         itemInSuffix.close();
       }
     }
-    pipeline.removeAll(suffix);
-    if(segment != null) pipeline.addLast(segment);
   }
 
   public Segment getTail() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
index ba53348..112f69e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java
@@ -269,4 +269,14 @@ public class MemStoreLABImpl implements MemStoreLAB {
     }
     return pooledChunks;
   }
+
+  @VisibleForTesting Integer getNumOfChunksReturnedToPool() {
+    int i = 0;
+    for (Integer id : this.chunks) {
+      if (chunkCreator.isChunkInPool(id)) {
+        i++;
+      }
+    }
+    return i;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
index f171dd0..06b9c40 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java
@@ -247,14 +247,16 @@ public class TestMemStoreLAB {
         }
       }
       // none of the chunkIds would have been returned back
-      assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() != 0);
+      assertTrue("All the chunks must have been cleared",
+          ChunkCreator.INSTANCE.numberOfMappedChunks() != 0);
+      int pooledChunksNum = mslab.getPooledChunks().size();
       // close the mslab
       mslab.close();
-      // make sure all chunks reclaimed or removed from chunk queue
-      int queueLength = mslab.getPooledChunks().size();
+      // make sure all chunks where reclaimed back to pool
+      int queueLength = mslab.getNumOfChunksReturnedToPool();
       assertTrue("All chunks in chunk queue should be reclaimed or removed"
-          + " after mslab closed but actually: " + queueLength,
-        queueLength == 0);
+          + " after mslab closed but actually: " + (pooledChunksNum-queueLength),
+          pooledChunksNum-queueLength == 0);
     } finally {
       ChunkCreator.INSTANCE = oldInstance;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java
index 96be8ec..d3f9bc1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java
@@ -140,7 +140,8 @@ public class TestMemstoreLABWithoutPool {
       mslab[i].close();
     }
     // all of the chunkIds would have been returned back
-    assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() == 0);
+    assertTrue("All the chunks must have been cleared",
+        ChunkCreator.INSTANCE.numberOfMappedChunks() == 0);
   }
 
   private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName,