You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2019/12/17 15:15:22 UTC

[hbase] branch branch-2 updated: HBASE-23066 Create a config that forces to cache blocks on compaction

This is an automated email from the ASF dual-hosted git repository.

ramkrishna pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new af0ce53  HBASE-23066 Create a config that forces to cache blocks on compaction
af0ce53 is described below

commit af0ce538366d8cefa5533faa0ad4ed5de50cbe7c
Author: Jacob Leblanc <ja...@apache.org>
AuthorDate: Tue Dec 17 20:44:12 2019 +0530

    HBASE-23066 Create a config that forces to cache blocks on compaction
---
 .../apache/hadoop/hbase/io/hfile/CacheConfig.java  |  23 ++++
 .../apache/hadoop/hbase/regionserver/HStore.java   |   4 +-
 .../hadoop/hbase/io/hfile/TestCacheOnWrite.java    | 125 ++++++++++++---------
 3 files changed, 100 insertions(+), 52 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index bb57fbe..7dbf4f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -81,6 +81,12 @@ public class CacheConfig {
    */
   public static final String PREFETCH_BLOCKS_ON_OPEN_KEY = "hbase.rs.prefetchblocksonopen";
 
+  /**
+   * Configuration key to cache blocks when a compacted file is written
+   */
+  public static final String CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY =
+      "hbase.rs.cachecompactedblocksonwrite";
+
   public static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
       "hbase.hfile.drop.behind.compaction";
 
@@ -93,6 +99,7 @@ public class CacheConfig {
   public static final boolean DEFAULT_EVICT_ON_CLOSE = false;
   public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false;
   public static final boolean DEFAULT_PREFETCH_ON_OPEN = false;
+  public static final boolean DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE = false;
   public static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
 
   /**
@@ -124,6 +131,11 @@ public class CacheConfig {
   /** Whether data blocks should be prefetched into the cache */
   private final boolean prefetchOnOpen;
 
+  /**
+   * Whether data blocks should be cached when compacted file is written
+   */
+  private final boolean cacheCompactedDataOnWrite;
+
   private final boolean dropBehindCompaction;
 
   // Local reference to the block cache
@@ -174,6 +186,8 @@ public class CacheConfig {
         (family == null ? false : family.isEvictBlocksOnClose());
     this.prefetchOnOpen = conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN) ||
         (family == null ? false : family.isPrefetchBlocksOnOpen());
+    this.cacheCompactedDataOnWrite = conf.getBoolean(CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
+      DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE);
     this.blockCache = blockCache;
     this.byteBuffAllocator = byteBuffAllocator;
     LOG.info("Created cacheConfig: " + this + (family == null ? "" : " for family " + family) +
@@ -193,6 +207,7 @@ public class CacheConfig {
     this.evictOnClose = cacheConf.evictOnClose;
     this.cacheDataCompressed = cacheConf.cacheDataCompressed;
     this.prefetchOnOpen = cacheConf.prefetchOnOpen;
+    this.cacheCompactedDataOnWrite = cacheConf.cacheCompactedDataOnWrite;
     this.dropBehindCompaction = cacheConf.dropBehindCompaction;
     this.blockCache = cacheConf.blockCache;
     this.byteBuffAllocator = cacheConf.byteBuffAllocator;
@@ -207,6 +222,7 @@ public class CacheConfig {
     this.evictOnClose = false;
     this.cacheDataCompressed = false;
     this.prefetchOnOpen = false;
+    this.cacheCompactedDataOnWrite = false;
     this.dropBehindCompaction = false;
     this.blockCache = null;
     this.byteBuffAllocator = ByteBuffAllocator.HEAP;
@@ -320,6 +336,13 @@ public class CacheConfig {
   }
 
   /**
+   * @return true if blocks should be cached while writing during compaction, false if not
+   */
+  public boolean shouldCacheCompactedBlocksOnWrite() {
+    return this.cacheCompactedDataOnWrite;
+  }
+
+  /**
    * Return true if we may find this type of block in block cache.
    * <p>
    * TODO: today {@code family.isBlockCacheEnabled()} only means {@code cacheDataOnRead}, so here we
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index c0bd0dd..bc0d694 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1118,9 +1118,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
       boolean shouldDropBehind) throws IOException {
     final CacheConfig writerCacheConf;
     if (isCompaction) {
-      // Don't cache data on write on compactions.
+      // Don't cache data on write on compactions, unless specifically configured to do so
       writerCacheConf = new CacheConfig(cacheConf);
-      writerCacheConf.setCacheDataOnWrite(false);
+      writerCacheConf.setCacheDataOnWrite(cacheConf.shouldCacheCompactedBlocksOnWrite());
     } else {
       writerCacheConf = cacheConf;
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
index 3a769b0..6f218de 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
@@ -405,59 +405,84 @@ public class TestCacheOnWrite {
     storeFilePath = sfw.getPath();
   }
 
-  private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags)
-      throws IOException, InterruptedException {
-    // TODO: need to change this test if we add a cache size threshold for
-    // compactions, or if we implement some other kind of intelligent logic for
-    // deciding what blocks to cache-on-write on compaction.
-    final String table = "CompactionCacheOnWrite";
-    final String cf = "myCF";
-    final byte[] cfBytes = Bytes.toBytes(cf);
-    final int maxVersions = 3;
-    ColumnFamilyDescriptor cfd =
-        ColumnFamilyDescriptorBuilder.newBuilder(cfBytes).setCompressionType(compress)
-            .setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions)
-            .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()).build();
-    HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache);
-    int rowIdx = 0;
-    long ts = EnvironmentEdgeManager.currentTime();
-    for (int iFile = 0; iFile < 5; ++iFile) {
-      for (int iRow = 0; iRow < 500; ++iRow) {
-        String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" +
-            iRow;
-        Put p = new Put(Bytes.toBytes(rowStr));
-        ++rowIdx;
-        for (int iCol = 0; iCol < 10; ++iCol) {
-          String qualStr = "col" + iCol;
-          String valueStr = "value_" + rowStr + "_" + qualStr;
-          for (int iTS = 0; iTS < 5; ++iTS) {
-            if (useTags) {
-              Tag t = new ArrayBackedTag((byte) 1, "visibility");
-              Tag[] tags = new Tag[1];
-              tags[0] = t;
-              KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
-                  HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
-              p.add(kv);
-            } else {
-              p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
+  private void testCachingDataBlocksDuringCompactionInternals(boolean useTags,
+      boolean cacheBlocksOnCompaction) throws IOException, InterruptedException {
+    // create a localConf
+    boolean localValue = conf.getBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
+      false);
+    try {
+      // Set the conf if testing caching compacted blocks on write
+      conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
+        cacheBlocksOnCompaction);
+
+      // TODO: need to change this test if we add a cache size threshold for
+      // compactions, or if we implement some other kind of intelligent logic for
+      // deciding what blocks to cache-on-write on compaction.
+      final String table = "CompactionCacheOnWrite";
+      final String cf = "myCF";
+      final byte[] cfBytes = Bytes.toBytes(cf);
+      final int maxVersions = 3;
+      ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(cfBytes)
+          .setCompressionType(compress).setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions)
+          .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()).build();
+      HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache);
+      int rowIdx = 0;
+      long ts = EnvironmentEdgeManager.currentTime();
+      for (int iFile = 0; iFile < 5; ++iFile) {
+        for (int iRow = 0; iRow < 500; ++iRow) {
+          String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + iRow;
+          Put p = new Put(Bytes.toBytes(rowStr));
+          ++rowIdx;
+          for (int iCol = 0; iCol < 10; ++iCol) {
+            String qualStr = "col" + iCol;
+            String valueStr = "value_" + rowStr + "_" + qualStr;
+            for (int iTS = 0; iTS < 5; ++iTS) {
+              if (useTags) {
+                Tag t = new ArrayBackedTag((byte) 1, "visibility");
+                Tag[] tags = new Tag[1];
+                tags[0] = t;
+                KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr),
+                    HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags);
+                p.add(kv);
+              } else {
+                p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr));
+              }
             }
           }
+          p.setDurability(Durability.ASYNC_WAL);
+          region.put(p);
+        }
+        region.flush(true);
+      }
+
+      clearBlockCache(blockCache);
+      assertEquals(0, blockCache.getBlockCount());
+
+      region.compact(false);
+      LOG.debug("compactStores() returned");
+
+      boolean dataBlockCached = false;
+      for (CachedBlock block : blockCache) {
+        if (BlockType.ENCODED_DATA.equals(block.getBlockType())
+            || BlockType.DATA.equals(block.getBlockType())) {
+          dataBlockCached = true;
+          break;
         }
-        p.setDurability(Durability.ASYNC_WAL);
-        region.put(p);
       }
-      region.flush(true);
-    }
-    clearBlockCache(blockCache);
-    assertEquals(0, blockCache.getBlockCount());
-    region.compact(false);
-    LOG.debug("compactStores() returned");
 
-    for (CachedBlock block: blockCache) {
-      assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType());
-      assertNotEquals(BlockType.DATA, block.getBlockType());
+      // Data blocks should be cached in instances where we are caching blocks on write. In the case
+      // of testing
+      // BucketCache, we cannot verify block type as it is not stored in the cache.
+      assertTrue(
+        "\nTest description: " + testDescription + "\ncacheBlocksOnCompaction: "
+            + cacheBlocksOnCompaction + "\n",
+        (cacheBlocksOnCompaction && !(blockCache instanceof BucketCache)) == dataBlockCached);
+
+      region.close();
+    } finally {
+      // reset back
+      conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, localValue);
     }
-    region.close();
   }
 
   @Test
@@ -467,8 +492,8 @@ public class TestCacheOnWrite {
   }
 
   @Test
-  public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
-    testNotCachingDataBlocksDuringCompactionInternals(false);
-    testNotCachingDataBlocksDuringCompactionInternals(true);
+  public void testCachingDataBlocksDuringCompaction() throws IOException, InterruptedException {
+    testCachingDataBlocksDuringCompactionInternals(false, false);
+    testCachingDataBlocksDuringCompactionInternals(true, true);
   }
 }