You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zy...@apache.org on 2018/07/10 04:38:52 UTC

[30/50] [abbrv] hbase git commit: HBASE-20789 TestBucketCache#testCacheBlockNextBlockMetadataMissing is flaky

HBASE-20789 TestBucketCache#testCacheBlockNextBlockMetadataMissing is flaky


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0454878e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0454878e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0454878e

Branch: refs/heads/HBASE-18477
Commit: 0454878e71e4fdb2d9c9cc130d02657f64babea0
Parents: 66ad9fd
Author: huzheng <hu...@xiaomi.com>
Authored: Thu Jun 28 12:01:03 2018 +0800
Committer: huzheng <op...@gmail.com>
Committed: Tue Jul 3 17:56:34 2018 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/io/hfile/BlockCacheUtil.java   | 55 +++++++++++---
 .../hadoop/hbase/io/hfile/LruBlockCache.java    | 17 +----
 .../hbase/io/hfile/bucket/BucketCache.java      | 78 ++++++++++++--------
 .../hbase/io/hfile/bucket/TestBucketCache.java  | 33 ++++++---
 4 files changed, 116 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0454878e/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
index 8a100ae..36f9e61 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
@@ -198,20 +198,55 @@ public class BlockCacheUtil {
    */
   public static int validateBlockAddition(Cacheable existing, Cacheable newBlock,
                                           BlockCacheKey cacheKey) {
-    int comparison = compareCacheBlock(existing, newBlock, true);
+    int comparison = compareCacheBlock(existing, newBlock, false);
     if (comparison != 0) {
-      LOG.warn("Cached block contents differ, trying to just compare the block contents " +
-          "without the next block. CacheKey: " + cacheKey);
-
-      // compare the contents, if they are not equal, we are in big trouble
-      int comparisonWithoutNextBlockMetadata = compareCacheBlock(existing, newBlock, false);
+      throw new RuntimeException("Cached block contents differ, which should not have happened."
+                                 + "cacheKey:" + cacheKey);
+    }
+    if ((existing instanceof HFileBlock) && (newBlock instanceof HFileBlock)) {
+      comparison = ((HFileBlock) existing).getNextBlockOnDiskSize()
+          - ((HFileBlock) newBlock).getNextBlockOnDiskSize();
+    }
+    return comparison;
+  }
 
-      if (comparisonWithoutNextBlockMetadata != 0) {
-        throw new RuntimeException("Cached block contents differ, which should not have happened."
-            + "cacheKey:" + cacheKey);
+  /**
+   * Because of the region splitting, it's possible that the split key locate in the middle of a
+   * block. So it's possible that both the daughter regions load the same block from their parent
+   * HFile. When pread, we don't force the read to read all of the next block header. So when two
+   * threads try to cache the same block, it's possible that one thread read all of the next block
+   * header but the other one didn't. if the already cached block hasn't next block header but the
+   * new block to cache has, then we can replace the existing block with the new block for better
+   * performance.(HBASE-20447)
+   * @param blockCache BlockCache to check
+   * @param cacheKey the block cache key
+   * @param newBlock the new block which try to put into the block cache.
+   * @return true means need to replace existing block with new block for the same block cache key.
+   *         false means just keep the existing block.
+   */
+  public static boolean shouldReplaceExistingCacheBlock(BlockCache blockCache,
+      BlockCacheKey cacheKey, Cacheable newBlock) {
+    Cacheable existingBlock = blockCache.getBlock(cacheKey, false, false, false);
+    try {
+      int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, newBlock, cacheKey);
+      if (comparison < 0) {
+        LOG.warn("Cached block contents differ by nextBlockOnDiskSize, the new block has "
+            + "nextBlockOnDiskSize set. Caching new block.");
+        return true;
+      } else if (comparison > 0) {
+        LOG.warn("Cached block contents differ by nextBlockOnDiskSize, the existing block has "
+            + "nextBlockOnDiskSize set, Keeping cached block.");
+        return false;
+      } else {
+        LOG.warn("Caching an already cached block: {}. This is harmless and can happen in rare "
+            + "cases (see HBASE-8547)",
+          cacheKey);
+        return false;
       }
+    } finally {
+      // return the block since we need to decrement the count
+      blockCache.returnBlock(cacheKey, existingBlock);
     }
-    return comparison;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/0454878e/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
index 8b8bd88..1dab053 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
@@ -379,21 +379,8 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
     }
 
     LruCachedBlock cb = map.get(cacheKey);
-    if (cb != null) {
-      int comparison = BlockCacheUtil.validateBlockAddition(cb.getBuffer(), buf, cacheKey);
-      if (comparison != 0) {
-        if (comparison < 0) {
-          LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Keeping cached block.");
-          return;
-        } else {
-          LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Caching new block.");
-        }
-      } else {
-        String msg = "Cached an already cached block: " + cacheKey + " cb:" + cb.getCacheKey();
-        msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
-        LOG.debug(msg);
-        return;
-      }
+    if (cb != null && !BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, buf)) {
+      return;
     }
     long currentSize = size.get();
     long currentAcceptableSize = acceptableSize();

http://git-wip-us.apache.org/repos/asf/hbase/blob/0454878e/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index bc45d39..40c0a00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -418,42 +418,35 @@ public class BucketCache implements BlockCache, HeapSize {
    * @param inMemory if block is in-memory
    * @param wait if true, blocking wait when queue is full
    */
-  public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
+  private void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem, boolean inMemory,
       boolean wait) {
-    if (LOG.isTraceEnabled()) LOG.trace("Caching key=" + cacheKey + ", item=" + cachedItem);
-    if (!cacheEnabled) {
-      return;
-    }
-
-    if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
-      Cacheable existingBlock = getBlock(cacheKey, false, false, false);
-
-      try {
-        int comparison = BlockCacheUtil.validateBlockAddition(existingBlock, cachedItem, cacheKey);
-        if (comparison != 0) {
-          if (comparison < 0) {
-            LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Keeping cached block.");
-            return;
-          } else {
-            LOG.warn("Cached block contents differ by nextBlockOnDiskSize. Caching new block.");
-          }
-        } else {
-          String msg = "Caching an already cached block: " + cacheKey;
-          msg += ". This is harmless and can happen in rare cases (see HBASE-8547)";
-          LOG.warn(msg);
-          return;
+    if (cacheEnabled) {
+      if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
+        if (BlockCacheUtil.shouldReplaceExistingCacheBlock(this, cacheKey, cachedItem)) {
+          cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
         }
-      } finally {
-        // return the block since we need to decrement the count
-        returnBlock(cacheKey, existingBlock);
+      } else {
+        cacheBlockWithWaitInternal(cacheKey, cachedItem, inMemory, wait);
       }
     }
+  }
 
-    /*
-     * Stuff the entry into the RAM cache so it can get drained to the persistent store
-     */
+  private void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable cachedItem,
+      boolean inMemory, boolean wait) {
+    if (!cacheEnabled) {
+      return;
+    }
+    LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
+    // Stuff the entry into the RAM cache so it can get drained to the persistent store
     RAMQueueEntry re =
         new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(), inMemory);
+    /**
+     * Don't use ramCache.put(cacheKey, re) here. because there may be a existing entry with same
+     * key in ramCache, the heap size of bucket cache need to update if replacing entry from
+     * ramCache. But WriterThread will also remove entry from ramCache and update heap size, if
+     * using ramCache.put(), It's possible that the removed entry in WriterThread is not the correct
+     * one, then the heap size will mess up (HBASE-20789)
+     */
     if (ramCache.putIfAbsent(cacheKey, re) != null) {
       return;
     }
@@ -937,6 +930,31 @@ public class BucketCache implements BlockCache, HeapSize {
     }
 
     /**
+     * Put the new bucket entry into backingMap. Notice that we are allowed to replace the existing
+     * cache with a new block for the same cache key. there's a corner case: one thread cache a
+     * block in ramCache, copy to io-engine and add a bucket entry to backingMap. Caching another
+     * new block with the same cache key do the same thing for the same cache key, so if not evict
+     * the previous bucket entry, then memory leak happen because the previous bucketEntry is gone
+     * but the bucketAllocator do not free its memory.
+     * @see BlockCacheUtil#shouldReplaceExistingCacheBlock(BlockCache blockCache,BlockCacheKey
+     *      cacheKey, Cacheable newBlock)
+     * @param key Block cache key
+     * @param bucketEntry Bucket entry to put into backingMap.
+     */
+    private void putIntoBackingMap(BlockCacheKey key, BucketEntry bucketEntry) {
+      BucketEntry previousEntry = backingMap.put(key, bucketEntry);
+      if (previousEntry != null && previousEntry != bucketEntry) {
+        ReentrantReadWriteLock lock = offsetLock.getLock(previousEntry.offset());
+        lock.writeLock().lock();
+        try {
+          blockEvicted(key, previousEntry, false);
+        } finally {
+          lock.writeLock().unlock();
+        }
+      }
+    }
+
+    /**
      * Flush the entries in ramCache to IOEngine and add bucket entry to backingMap.
      * Process all that are passed in even if failure being sure to remove from ramCache else we'll
      * never undo the references and we'll OOME.
@@ -1017,7 +1035,7 @@ public class BucketCache implements BlockCache, HeapSize {
         BlockCacheKey key = entries.get(i).getKey();
         // Only add if non-null entry.
         if (bucketEntries[i] != null) {
-          backingMap.put(key, bucketEntries[i]);
+          putIntoBackingMap(key, bucketEntries[i]);
         }
         // Always remove from ramCache even if we failed adding it to the block cache above.
         RAMQueueEntry ramCacheEntry = ramCache.remove(key);

http://git-wip-us.apache.org/repos/asf/hbase/blob/0454878e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index b365669..924dd02 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -197,14 +197,19 @@ public class TestBucketCache {
     CacheTestUtils.testHeapSizeChanges(cache, BLOCK_SIZE);
   }
 
+  private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey)
+      throws InterruptedException {
+    while (!cache.backingMap.containsKey(cacheKey) || cache.ramCache.containsKey(cacheKey)) {
+      Thread.sleep(100);
+    }
+  }
+
   // BucketCache.cacheBlock is async, it first adds block to ramCache and writeQueue, then writer
   // threads will flush it to the bucket and put reference entry in backingMap.
   private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, BlockCacheKey cacheKey,
       Cacheable block) throws InterruptedException {
     cache.cacheBlock(cacheKey, block);
-    while (!cache.backingMap.containsKey(cacheKey)) {
-      Thread.sleep(100);
-    }
+    waitUntilFlushedToBucket(cache, cacheKey);
   }
 
   @Test
@@ -409,7 +414,7 @@ public class TestBucketCache {
   }
 
   @Test
-  public void testCacheBlockNextBlockMetadataMissing() {
+  public void testCacheBlockNextBlockMetadataMissing() throws Exception {
     int size = 100;
     int length = HConstants.HFILEBLOCK_HEADER_SIZE + size;
     byte[] byteArr = new byte[length];
@@ -427,22 +432,26 @@ public class TestBucketCache {
     blockWithNextBlockMetadata.serialize(block1Buffer, true);
     blockWithoutNextBlockMetadata.serialize(block2Buffer, true);
 
-    //Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
+    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata back.
     CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
-        block1Buffer);
+      block1Buffer);
 
-    //Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
+    waitUntilFlushedToBucket(cache, key);
+
+    // Add blockWithoutNextBlockMetada, expect blockWithNextBlockMetadata back.
     CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
-        block1Buffer);
+      block1Buffer);
 
-    //Clear and add blockWithoutNextBlockMetadata
+    // Clear and add blockWithoutNextBlockMetadata
     cache.evictBlock(key);
     assertNull(cache.getBlock(key, false, false, false));
     CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithoutNextBlockMetadata, actualBuffer,
-        block2Buffer);
+      block2Buffer);
+
+    waitUntilFlushedToBucket(cache, key);
 
-    //Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
+    // Add blockWithNextBlockMetadata, expect blockWithNextBlockMetadata to replace.
     CacheTestUtils.getBlockAndAssertEquals(cache, key, blockWithNextBlockMetadata, actualBuffer,
-        block1Buffer);
+      block1Buffer);
   }
 }