You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/01/25 20:41:08 UTC

svn commit: r1235882 - in /hbase/trunk/src: main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/metrics/ test/java/org/apache/hadoop/hbase/client/ test/java/org/apac...

Author: mbautin
Date: Wed Jan 25 19:41:07 2012
New Revision: 1235882

URL: http://svn.apache.org/viewvc?rev=1235882&view=rev
Log:
HBASE-5230 : ensure that compactions do not cache-on-write data blocks

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1235882&r1=1235881&r2=1235882&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Wed Jan 25 19:41:07 2012
@@ -289,7 +289,7 @@ public class HFileReaderV2 extends Abstr
       }
       getSchemaMetrics().updateOnCacheMiss(blockCategory, isCompaction, delta);
 
-      // Cache the block
+      // Cache the block if necessary
       if (cacheBlock && cacheConf.shouldCacheBlockOnRead(
               hfileBlock.getBlockType().getCategory())) {
         cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock,

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java?rev=1235882&r1=1235881&r2=1235882&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java Wed Jan 25 19:41:07 2012
@@ -242,10 +242,7 @@ public class HFileWriterV2 extends Abstr
     HFile.writeOps.incrementAndGet();
 
     if (cacheConf.shouldCacheDataOnWrite()) {
-      HFileBlock blockForCaching = fsBlockWriter.getBlockForCaching();
-      passSchemaMetricsTo(blockForCaching);
-      cacheConf.getBlockCache().cacheBlock(
-          HFile.getBlockCacheKey(name, lastDataBlockOffset), blockForCaching);
+      doCacheOnWrite(lastDataBlockOffset);
     }
   }
 
@@ -263,17 +260,26 @@ public class HFileWriterV2 extends Abstr
         totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();
 
         if (cacheThisBlock) {
-          // Cache this block on write.
-          HFileBlock cBlock = fsBlockWriter.getBlockForCaching();
-          passSchemaMetricsTo(cBlock);
-          cacheConf.getBlockCache().cacheBlock(
-              HFile.getBlockCacheKey(name, offset), cBlock);
+          doCacheOnWrite(offset);
         }
       }
     }
   }
 
   /**
+   * Caches the last written HFile block.
+   * @param offset the offset of the block we want to cache. Used to determine
+   *          the cache key.
+   */
+  private void doCacheOnWrite(long offset) {
+    // Cache this block on write.
+    HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching();
+    passSchemaMetricsTo(cacheFormatBlock);
+    cacheConf.getBlockCache().cacheBlock(
+        HFile.getBlockCacheKey(name, offset), cacheFormatBlock);
+  }
+
+  /**
    * Ready a new block for writing.
    *
    * @throws IOException

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1235882&r1=1235881&r2=1235882&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Wed Jan 25 19:41:07 2012
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -742,4 +743,14 @@ public class LruBlockCache implements Bl
     return fileNames;
   }
 
+  Map<BlockType, Integer> getBlockTypeCountsForTest() {
+    Map<BlockType, Integer> counts =
+        new EnumMap<BlockType, Integer>(BlockType.class);
+    for (CachedBlock cb : map.values()) {
+      BlockType blockType = ((HFileBlock) cb.getBuffer()).getBlockType();
+      Integer count = counts.get(blockType);
+      counts.put(blockType, (count == null ? 0 : count) + 1);
+    }
+    return counts;
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1235882&r1=1235881&r2=1235882&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Wed Jan 25 19:41:07 2012
@@ -747,19 +747,28 @@ public class Store extends SchemaConfigu
    */
   private StoreFile.Writer createWriterInTmp(int maxKeyCount)
   throws IOException {
-    return createWriterInTmp(maxKeyCount, this.compression);
+    return createWriterInTmp(maxKeyCount, this.compression, false);
   }
 
   /*
    * @param maxKeyCount
    * @param compression Compression algorithm to use
+   * @param isCompaction whether we are creating a new file in a compaction
    * @return Writer for a new StoreFile in the tmp dir.
    */
   private StoreFile.Writer createWriterInTmp(int maxKeyCount,
-    Compression.Algorithm compression)
+    Compression.Algorithm compression, boolean isCompaction)
   throws IOException {
+    final CacheConfig writerCacheConf;
+    if (isCompaction) {
+      // Don't cache data on write on compactions.
+      writerCacheConf = new CacheConfig(cacheConf);
+      writerCacheConf.setCacheDataOnWrite(false);
+    } else {
+      writerCacheConf = cacheConf;
+    }
     StoreFile.Writer w = StoreFile.createWriter(fs, region.getTmpDir(),
-        blocksize, compression, comparator, conf, cacheConf,
+        blocksize, compression, comparator, conf, writerCacheConf,
         family.getBloomFilterType(), maxKeyCount);
     // The store file writer's path does not include the CF name, so we need
     // to configure the HFile writer directly.
@@ -1428,8 +1437,8 @@ public class Store extends SchemaConfigu
         do {
           hasMore = scanner.next(kvs, this.compactionKVMax);
           if (writer == null && !kvs.isEmpty()) {
-            writer = createWriterInTmp(maxKeyCount,
-              this.compactionCompression);
+            writer = createWriterInTmp(maxKeyCount, this.compactionCompression,
+                true);
           }
           if (writer != null) {
             // output to writer:

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java?rev=1235882&r1=1235881&r2=1235882&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java Wed Jan 25 19:41:07 2012
@@ -219,9 +219,13 @@ public class SchemaMetrics {
       256 - MORE_CFS_OMITTED_STR.length();
 
   // Global variables
-  /** All instances of this class */
+  /**
+   * Maps a string key consisting of table name and column family name, with
+   * table name optionally replaced with {@link #TOTAL_KEY} if per-table
+   * metrics are disabled, to an instance of this class.
+   */
   private static final ConcurrentHashMap<String, SchemaMetrics>
-      cfToMetrics = new ConcurrentHashMap<String, SchemaMetrics>();
+      tableAndFamilyToMetrics = new ConcurrentHashMap<String, SchemaMetrics>();
 
   /** Metrics for all tables and column families. */
   // This has to be initialized after cfToMetrics.
@@ -317,14 +321,14 @@ public class SchemaMetrics {
     tableName = getEffectiveTableName(tableName);
 
     final String instanceKey = tableName + "\t" + cfName;
-    SchemaMetrics schemaMetrics = cfToMetrics.get(instanceKey);
+    SchemaMetrics schemaMetrics = tableAndFamilyToMetrics.get(instanceKey);
     if (schemaMetrics != null) {
       return schemaMetrics;
     }
 
     schemaMetrics = new SchemaMetrics(tableName, cfName);
-    SchemaMetrics existingMetrics = cfToMetrics.putIfAbsent(instanceKey,
-        schemaMetrics);
+    SchemaMetrics existingMetrics =
+        tableAndFamilyToMetrics.putIfAbsent(instanceKey, schemaMetrics);
     return existingMetrics != null ? existingMetrics : schemaMetrics;
   }
 
@@ -720,7 +724,7 @@ public class SchemaMetrics {
 
   public static Map<String, Long> getMetricsSnapshot() {
     Map<String, Long> metricsSnapshot = new TreeMap<String, Long>();
-    for (SchemaMetrics cfm : cfToMetrics.values()) {
+    for (SchemaMetrics cfm : tableAndFamilyToMetrics.values()) {
       for (String metricName : cfm.getAllMetricNames()) {
         long metricValue;
         if (isTimeVaryingKey(metricName)) {
@@ -781,7 +785,7 @@ public class SchemaMetrics {
     final Set<String> allKeys = new TreeSet<String>(oldMetrics.keySet());
     allKeys.addAll(newMetrics.keySet());
 
-    for (SchemaMetrics cfm : cfToMetrics.values()) {
+    for (SchemaMetrics cfm : tableAndFamilyToMetrics.values()) {
       for (String metricName : cfm.getAllMetricNames()) {
         if (metricName.startsWith(CF_PREFIX + CF_PREFIX)) {
           throw new AssertionError("Column family prefix used twice: " +
@@ -876,4 +880,16 @@ public class SchemaMetrics {
     useTableNameGlobally = useTableNameNew;
   }
 
+  /** Formats the given map of metrics in a human-readable way. */
+  public static String formatMetrics(Map<String, Long> metrics) {
+    StringBuilder sb = new StringBuilder();
+    for (Map.Entry<String, Long> entry : metrics.entrySet()) {
+      if (sb.length() > 0) {
+        sb.append('\n');
+      }
+      sb.append(entry.getKey() + " : " + entry.getValue());
+    }
+    return sb.toString();
+  }
+
 }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1235882&r1=1235881&r2=1235882&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Wed Jan 25 19:41:07 2012
@@ -4487,26 +4487,27 @@ public class TestFromClientSide {
     assertEquals(++expectedBlockCount, cache.getBlockCount());
     assertEquals(expectedBlockHits, cache.getStats().getHitCount());
     assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
-    // compact, net minus on block, two hits, no misses
+    // compact, net minus two blocks, two hits, no misses
     System.out.println("Compacting");
     assertEquals(2, store.getNumberOfstorefiles());
     store.triggerMajorCompaction();
     region.compactStores();
     waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
     assertEquals(1, store.getNumberOfstorefiles());
-    assertEquals(--expectedBlockCount, cache.getBlockCount());
+    expectedBlockCount -= 2; // evicted two blocks, cached none
+    assertEquals(expectedBlockCount, cache.getBlockCount());
     expectedBlockHits += 2;
     assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
     assertEquals(expectedBlockHits, cache.getStats().getHitCount());
-    // read the row, same blocks, one hit no miss
+    // read the row, this should be a cache miss because we don't cache data
+    // blocks on compaction
     r = table.get(new Get(ROW));
     assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
     assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
+    expectedBlockCount += 1; // cached one data block
     assertEquals(expectedBlockCount, cache.getBlockCount());
-    assertEquals(++expectedBlockHits, cache.getStats().getHitCount());
-    assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
-    // no cache misses!
-    assertEquals(startBlockMiss, cache.getStats().getMissCount());
+    assertEquals(expectedBlockHits, cache.getStats().getHitCount());
+    assertEquals(++expectedBlockMiss, cache.getStats().getMissCount());
   }
 
   private void waitForStoreFileCount(Store store, int count, int timeout)

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java?rev=1235882&r1=1235881&r2=1235882&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java Wed Jan 25 19:41:07 2012
@@ -20,11 +20,16 @@
 
 package org.apache.hadoop.hbase.io.hfile;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -32,9 +37,18 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,7 +56,6 @@ import org.junit.experimental.categories
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
-import static org.junit.Assert.*;
 
 /**
  * Tests {@link HFile} cache-on-write functionality for the following block
@@ -70,6 +83,7 @@ public class TestCacheOnWrite {
   private static final int NUM_KV = 25000;
   private static final int INDEX_BLOCK_SIZE = 512;
   private static final int BLOOM_BLOCK_SIZE = 4096;
+  private static final BloomType BLOOM_TYPE = StoreFile.BloomType.ROWCOL;
 
   /** The number of valid key types possible in a store file */
   private static final int NUM_VALID_KEY_TYPES =
@@ -149,7 +163,7 @@ public class TestCacheOnWrite {
   }
 
   @Test
-  public void testCacheOnWrite() throws IOException {
+  public void testStoreFileCacheOnWrite() throws IOException {
     writeStoreFile();
     readStoreFile();
   }
@@ -215,7 +229,7 @@ public class TestCacheOnWrite {
         "test_cache_on_write");
     StoreFile.Writer sfw = StoreFile.createWriter(fs, storeFileParentDir,
         DATA_BLOCK_SIZE, compress, KeyValue.COMPARATOR, conf,
-        cacheConf, StoreFile.BloomType.ROWCOL, NUM_KV);
+        cacheConf, BLOOM_TYPE, NUM_KV);
 
     final int rowLen = 32;
     for (int i = 0; i < NUM_KV; ++i) {
@@ -236,6 +250,56 @@ public class TestCacheOnWrite {
     storeFilePath = sfw.getPath();
   }
 
+  @Test
+  public void testNotCachingDataBlocksDuringCompaction() throws IOException {
+    // TODO: need to change this test if we add a cache size threshold for
+    // compactions, or if we implement some other kind of intelligent logic for
+    // deciding what blocks to cache-on-write on compaction.
+    final String table = "CompactionCacheOnWrite";
+    final String cf = "myCF";
+    final byte[] cfBytes = Bytes.toBytes(cf);
+    final int maxVersions = 3;
+    HRegion region = TEST_UTIL.createTestRegion(table, cf, compress,
+        BLOOM_TYPE, maxVersions, HColumnDescriptor.DEFAULT_BLOCKCACHE,
+        HFile.DEFAULT_BLOCKSIZE);
+    int rowIdx = 0;
+    long ts = EnvironmentEdgeManager.currentTimeMillis();
+    for (int iFile = 0; iFile < 5; ++iFile) {
+      for (int iRow = 0; iRow < 500; ++iRow) {
+        String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + 
+            iRow;
+        Put p = new Put(Bytes.toBytes(rowStr));
+        ++rowIdx;
+        for (int iCol = 0; iCol < 10; ++iCol) {
+          String qualStr = "col" + iCol;
+          String valueStr = "value_" + rowStr + "_" + qualStr;
+          for (int iTS = 0; iTS < 5; ++iTS) {
+            p.add(cfBytes, Bytes.toBytes(qualStr), ts++,
+                Bytes.toBytes(valueStr));
+          }
+        }
+        region.put(p);
+      }
+      region.flushcache();
+    }
+    LruBlockCache blockCache =
+        (LruBlockCache) new CacheConfig(conf).getBlockCache();
+    blockCache.clearCache();
+    assertEquals(0, blockCache.getBlockTypeCountsForTest().size());
+    Map<String, Long> metricsBefore = SchemaMetrics.getMetricsSnapshot();
+    region.compactStores();
+    LOG.debug("compactStores() returned");
+    SchemaMetrics.validateMetricChanges(metricsBefore);
+    Map<String, Long> compactionMetrics = SchemaMetrics.diffMetrics(
+        metricsBefore, SchemaMetrics.getMetricsSnapshot());
+    LOG.debug(SchemaMetrics.formatMetrics(compactionMetrics));
+    Map<BlockType, Integer> blockTypesInCache =
+        blockCache.getBlockTypeCountsForTest();
+    LOG.debug("Block types in cache: " + blockTypesInCache);
+    assertNull(blockTypesInCache.get(BlockType.DATA));
+    region.close();
+    blockCache.shutdown();
+  }
 
   @org.junit.Rule
   public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =