You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2019/06/27 08:52:40 UTC

[hbase] 01/01: HBASE-22612 Address the final overview reviewing comments of HBASE-21879

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 6a3aa789d8e94a0304966e72d25ab89598d14eda
Author: huzheng <op...@gmail.com>
AuthorDate: Mon Jun 24 12:09:47 2019 +0800

    HBASE-22612 Address the final overview reviewing comments of HBASE-21879
---
 .../apache/hadoop/hbase/io/ByteBuffAllocator.java  |  4 +--
 .../hbase/io/ByteBufferListOutputStream.java       |  2 +-
 .../apache/hadoop/hbase/io/hfile/HFileContext.java |  4 +--
 .../java/org/apache/hadoop/hbase/nio/RefCnt.java   |  6 ++---
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java     | 15 ++++++-----
 .../hadoop/hbase/io/hfile/LruBlockCache.java       | 10 ++++++--
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  | 13 ++++++++--
 .../hadoop/hbase/io/hfile/CacheTestUtils.java      | 30 +++++++++++-----------
 .../hbase/io/hfile/bucket/TestBucketCache.java     |  2 +-
 .../hbase/mob/TestMobWithByteBuffAllocator.java    |  4 ++-
 10 files changed, 55 insertions(+), 35 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
index 9991e79..7400b04 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
@@ -26,8 +26,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAdder;
 
-import sun.nio.ch.DirectBuffer;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.nio.ByteBuff;
@@ -38,6 +36,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
+import sun.nio.ch.DirectBuffer;
+
 /**
  * ByteBuffAllocator is used for allocating/freeing the ByteBuffers from/to NIO ByteBuffer pool, and
  * it provide high-level interfaces for upstream. when allocating desired memory size, it will
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
index e8bd322..200c9b3 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
 public class ByteBufferListOutputStream extends ByteBufferOutputStream {
   private static final Logger LOG = LoggerFactory.getLogger(ByteBufferListOutputStream.class);
 
-  private ByteBuffAllocator allocator;
+  private final ByteBuffAllocator allocator;
   // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If
   // it is not available will make a new one our own and keep writing to that. We keep track of all
   // the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
index 6074f10..65649f4 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
@@ -193,8 +193,8 @@ public class HFileContext implements HeapSize, Cloneable {
   }
 
   /**
-   * HeapSize implementation. NOTE : The heapsize should be altered as and when new state variable
-   * are added
+   * HeapSize implementation. NOTE : The heap size should be altered when new state variable are
+   * added.
    * @return heap size of the HFileContext
    */
   @Override
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java
index 91c6ee7..018c8b4 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java
@@ -26,7 +26,7 @@ import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted;
 
 /**
  * Maintain an reference count integer inside to track life cycle of {@link ByteBuff}, if the
- * reference count become 0, it'll call {@link Recycler#free()} once.
+ * reference count become 0, it'll call {@link Recycler#free()} exactly once.
  */
 @InterfaceAudience.Private
 public class RefCnt extends AbstractReferenceCounted {
@@ -36,8 +36,8 @@ public class RefCnt extends AbstractReferenceCounted {
   /**
    * Create an {@link RefCnt} with an initial reference count = 1. If the reference count become
    * zero, the recycler will do nothing. Usually, an Heap {@link ByteBuff} will use this kind of
-   * refCnt to track its life cycle, it help to abstract the code path although it's meaningless to
-   * use an refCnt for heap ByteBuff.
+   * refCnt to track its life cycle, it help to abstract the code path although it's not really
+   * needed to track on heap ByteBuff.
    */
   public static RefCnt create() {
     return new RefCnt(ByteBuffAllocator.NONE);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 9cef9c0..c626426 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -920,7 +920,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
         }
         if (block.getOffset() < 0) {
           releaseIfNotCurBlock(block);
-          throw new IOException("Invalid block file offset: " + block + ", path=" + reader.getPath());
+          throw new IOException(
+              "Invalid block file offset: " + block + ", path=" + reader.getPath());
         }
         // We are reading the next block without block type validation, because
         // it might turn out to be a non-data block.
@@ -1131,7 +1132,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
         isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding());
       if (newBlock.getOffset() < 0) {
         releaseIfNotCurBlock(newBlock);
-        throw new IOException("Invalid block offset: " + newBlock.getOffset() + ", path=" + reader.getPath());
+        throw new IOException(
+            "Invalid block offset: " + newBlock.getOffset() + ", path=" + reader.getPath());
       }
       updateCurrentBlock(newBlock);
     }
@@ -1339,7 +1341,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
             // schema definition change.
             LOG.info("Evicting cached block with key " + cacheKey
                 + " because of a data block encoding mismatch" + "; expected: "
-                + expectedDataBlockEncoding + ", actual: " + actualDataBlockEncoding + ", path=" + path);
+                + expectedDataBlockEncoding + ", actual: " + actualDataBlockEncoding + ", path="
+                + path);
             // This is an error scenario. so here we need to release the block.
             cachedBlock.release();
             cache.evictBlock(cacheKey);
@@ -1662,9 +1665,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
         short dataBlockEncoderId = newBlock.getDataBlockEncodingId();
         if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) {
           String encoderCls = dataBlockEncoder.getClass().getName();
-          throw new CorruptHFileException(
-              "Encoder " + encoderCls + " doesn't support data block encoding "
-                  + DataBlockEncoding.getNameFromId(dataBlockEncoderId) + ",path=" + reader.getPath());
+          throw new CorruptHFileException("Encoder " + encoderCls
+              + " doesn't support data block encoding "
+              + DataBlockEncoding.getNameFromId(dataBlockEncoderId) + ",path=" + reader.getPath());
         }
         updateCurrBlockRef(newBlock);
         ByteBuff encodedBuffer = getEncodedBuffer(newBlock);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
index 7740460..79ec0a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
@@ -153,8 +153,14 @@ public class LruBlockCache implements FirstLevelBlockCache {
   private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size";
   private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
 
-  /** Concurrent map (the cache) */
-  private transient final Map<BlockCacheKey, LruCachedBlock> map;
+  /**
+   * Defined the cache map as {@link ConcurrentHashMap} here, because in
+   * {@link LruBlockCache#getBlock}, we need to guarantee the atomicity of map#computeIfPresent
+   * (key, func). Besides, the func method must execute exactly once only when the key is present
+   * and under the lock context, otherwise the reference count will be messed up. Notice that the
+   * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
+   */
+  private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map;
 
   /** Eviction lock (locked when eviction in process) */
   private transient final ReentrantLock evictionLock = new ReentrantLock(true);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 799f23c..e6fd742 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -140,7 +140,7 @@ public class BucketCache implements BlockCache, HeapSize {
   transient final RAMCache ramCache;
   // In this map, store the block's meta data like offset, length
   @VisibleForTesting
-  transient ConcurrentMap<BlockCacheKey, BucketEntry> backingMap;
+  transient ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap;
 
   /**
    * Flag if the cache is enabled or not... We shut it off if there are IO
@@ -1524,7 +1524,16 @@ public class BucketCache implements BlockCache, HeapSize {
    * Wrapped the delegate ConcurrentMap with maintaining its block's reference count.
    */
   static class RAMCache {
-    final ConcurrentMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>();
+    /**
+     * Defined the map as {@link ConcurrentHashMap} explicitly here, because in
+     * {@link RAMCache#get(BlockCacheKey)} and
+     * {@link RAMCache#putIfAbsent(BlockCacheKey, RAMQueueEntry)} , we need to guarantee the
+     * atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the
+     * func method can execute exactly once only when the key is present(or absent) and under the
+     * lock context. Otherwise, the reference count of block will be messed up. Notice that the
+     * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that.
+     */
+    final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>();
 
     public boolean containsKey(BlockCacheKey key) {
       return delegate.containsKey(key);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
index a7bb8e6..f69de00 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
@@ -223,22 +223,22 @@ public class CacheTestUtils {
 
   public static class ByteArrayCacheable implements Cacheable {
 
-    static final CacheableDeserializer<Cacheable> blockDeserializer =
-        new CacheableDeserializer<Cacheable>() {
-          @Override
-          public int getDeserializerIdentifier() {
-            return deserializerIdentifier;
-          }
+    private static final CacheableDeserializer<Cacheable> blockDeserializer =
+      new CacheableDeserializer<Cacheable>() {
+        @Override
+        public int getDeserializerIdentifier() {
+          return deserializerIdentifier;
+        }
 
-          @Override
-          public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException {
-            int len = b.getInt();
-            Thread.yield();
-            byte buf[] = new byte[len];
-            b.get(buf);
-            return new ByteArrayCacheable(buf);
-          }
-        };
+        @Override
+        public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException {
+          int len = b.getInt();
+          Thread.yield();
+          byte[] buf = new byte[len];
+          b.get(buf);
+          return new ByteArrayCacheable(buf);
+        }
+      };
 
     final byte[] buf;
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index 4ac7907..bbc2e53 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -51,9 +51,9 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
-import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.junit.After;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobWithByteBuffAllocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobWithByteBuffAllocator.java
index e84af12..83d7d09 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobWithByteBuffAllocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobWithByteBuffAllocator.java
@@ -101,11 +101,13 @@ public class TestMobWithByteBuffAllocator {
     int rows = 0;
     try (Table table = UTIL.getConnection().getTable(tableName)) {
       try (ResultScanner scanner = table.getScanner(new Scan().setReversed(true))) {
-        for (Result res; (res = scanner.next()) != null;) {
+        Result res = scanner.next();
+        while (res != null) {
           rows++;
           for (Cell cell : res.listCells()) {
             Assert.assertTrue(CellUtil.cloneValue(cell).length > 0);
           }
+          res = scanner.next();
         }
       }
     }