You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/01/08 20:18:24 UTC

svn commit: r1556609 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/io/hfile/bucket/ main/java/org/apache/hadoop/hbase/regionserver/metrics/ test/java/org/apache/hadoop/hbase/io/hfile/ te...

Author: liyin
Date: Wed Jan  8 19:18:24 2014
New Revision: 1556609

URL: http://svn.apache.org/r1556609
Log:
[0.89-fb] [HBASE-10291] Per-column family L2 cache metrics

Author: arjen

Summary:
This diff adds per-column family L2 cache metrics. In order to make this happen the following needed to change:

The L2 cache deals with raw block data. However, in order to keep the size metrics for different CF's and block types valid the cache needs to know about the block type. In the BlockCache this is achieved by making the HFileBlock implement SchemaAware and augment the HFileBlock with the appropriate data before adding it to the cache. In order to do the same with the L2 cache RawHFileBlocks were added, and the BucketEntry was made to implement SchemaAware.

This led to an interesting problem, as the L2 cache is possibly used by readers and writers who do not have the SchemaAware data. In order to work around this a lightweight facade wrapping CacheConfig and the L2 cache can be instantiated and passed down instead of the actual L2 cahce. This L2CacheAgent is SchemaAware and can add the schema data to the RawHFileBlock's before handing them over to the cache. I feel the L2CacheAgent is still a work in progress as it could potentially centralize the cache policy and allow all cache decisions to be removed from HFile- and block readers and writers.

Depends on D1112436

Test Plan: Added and ran the unit tests, deployed a jar to one of the shadow clusters and observed the stats for a while.

Reviewers: liyintang, aaiyer, manukranthk

Reviewed By: liyintang

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D1112816

Task ID: 2567910

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2CacheAgent.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/RawHFileBlock.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCache.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCacheFactory.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2Cache.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/RAMQueueEntry.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestSchemaMetrics.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java?rev=1556609&r1=1556608&r2=1556609&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java Wed Jan  8 19:18:24 2014
@@ -526,13 +526,17 @@ public class CacheConfig implements Conf
   }
 
   public boolean isL2CacheEnabled() {
-    return l2Cache != null && !l2Cache.isShutdown();
+    return l2Cache != null && l2Cache.isEnabled();
   }
 
   public L2Cache getL2Cache() {
     return l2Cache;
   }
 
+  public L2CacheAgent getL2CacheAgent() {
+    return new L2CacheAgent(this, l2Cache);
+  }
+
   @Override
   public void notifyOnChange(Configuration conf) {
     new CacheConfigBuilder(conf).update(this);

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java?rev=1556609&r1=1556608&r2=1556609&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java Wed Jan  8 19:18:24 2014
@@ -1413,7 +1413,7 @@ public class HFileBlock extends SchemaCo
   public static class FSReaderV2 extends AbstractFSReader {
 
     /** L2 cache instance or null if l2 cache is disabled */
-    private final L2Cache l2Cache;
+    private final L2CacheAgent cacheAgent;
 
     /**
      * Name of the current hfile. Used to compose the key in for the
@@ -1437,9 +1437,9 @@ public class HFileBlock extends SchemaCo
         };
 
     public FSReaderV2(FSDataInputStream istream, Algorithm compressAlgo,
-        long fileSize, L2Cache l2Cache, String hfileNameForL2Cache) {
+        long fileSize, L2CacheAgent cacheAgent, String hfileNameForL2Cache) {
       super(istream, compressAlgo, fileSize);
-      this.l2Cache = l2Cache;
+      this.cacheAgent = cacheAgent;
       this.hfileNameForL2Cache = hfileNameForL2Cache;
     }
 
@@ -1532,9 +1532,6 @@ public class HFileBlock extends SchemaCo
               headerAndData.array(), headerAndData.arrayOffset()
                   + preReadHeaderSize, onDiskSizeWithHeader
                   - preReadHeaderSize, true, offset + preReadHeaderSize, options);
-          if (addToL2Cache) {
-            cacheBlockInL2Cache(offset, headerAndData.array());
-          }
           b = new HFileBlock(headerAndData);
           b.assumeUncompressed();
           b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
@@ -1542,6 +1539,10 @@ public class HFileBlock extends SchemaCo
 
           if (b.nextBlockOnDiskSizeWithHeader > 0)
             setNextBlockHeader(offset, b);
+
+          if (addToL2Cache) {
+            cacheRawBlockBytes(b.getBlockType(), offset, headerAndData.array());
+          }
         } else {
           // Allocate enough space to fit the next block's header too.
           byte[] onDiskBlock = new byte[onDiskSizeWithHeader + HEADER_SIZE];
@@ -1565,14 +1566,14 @@ public class HFileBlock extends SchemaCo
           }
           b.validateOnDiskSizeWithoutHeader(onDiskSizeWithoutHeader);
           b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
-          if (l2Cache != null && addToL2Cache) {
+          if (addToL2Cache && cacheAgent.isL2CacheEnabled()) {
             if (preReadHeaderSize > 0) {
               // If we plan to add block to L2 cache, we need to copy the
               // header information into the byte array so that it can be
               // cached in the L2 cache.
               System.arraycopy(header, 0, onDiskBlock, 0, preReadHeaderSize);
             }
-            cacheBlockInL2Cache(offset, onDiskBlock);
+            cacheRawBlockBytes(b.getBlockType(), offset, onDiskBlock);
           }
           DataInputStream dis = new DataInputStream(new ByteArrayInputStream(
               onDiskBlock, HEADER_SIZE, onDiskSizeWithoutHeader));
@@ -1629,7 +1630,7 @@ public class HFileBlock extends SchemaCo
               b.uncompressedSizeWithoutHeader, true, offset + HEADER_SIZE,
               options);
           if (addToL2Cache) {
-            cacheBlockInL2Cache(offset, b.buf.array());
+            cacheRawBlockBytes(b.getBlockType(), offset, b.buf.array());
           }
           if (b.nextBlockOnDiskSizeWithHeader > 0) {
             setNextBlockHeader(offset, b);
@@ -1641,13 +1642,13 @@ public class HFileBlock extends SchemaCo
           b.nextBlockOnDiskSizeWithHeader = readAtOffset(compressedBytes,
               HEADER_SIZE, b.onDiskSizeWithoutHeader, true, offset
                   + HEADER_SIZE, options);
-          if (l2Cache != null && addToL2Cache) {
+          if (addToL2Cache && cacheAgent.isL2CacheEnabled()) {
             // If l2 cache is enabled, we need to copy the header bytes to
             // the compressed bytes array, so that they can be cached in the
             // L2 cache.
             System.arraycopy(headerBuf.array(), 0, compressedBytes, 0,
                 HEADER_SIZE);
-            cacheBlockInL2Cache(offset, compressedBytes);
+            cacheRawBlockBytes(b.getBlockType(), offset, compressedBytes);
           }
           DataInputStream dis = new DataInputStream(new ByteArrayInputStream(
               compressedBytes, HEADER_SIZE, b.onDiskSizeWithoutHeader));
@@ -1700,9 +1701,12 @@ public class HFileBlock extends SchemaCo
      * @param blockBytes The block's bytes as they appear on disk (i.e.,
      *                   correctly encoded and compressed)
      */
-    void cacheBlockInL2Cache(long offset, byte[] blockBytes) {
-      if (l2Cache != null) {
-        l2Cache.cacheRawBlock(hfileNameForL2Cache, offset, blockBytes);
+    private void cacheRawBlockBytes(BlockType type, long offset,
+                                    byte[] blockBytes) {
+      if (cacheAgent != null) {
+        BlockCacheKey cacheKey = new BlockCacheKey(hfileNameForL2Cache, offset);
+        RawHFileBlock rawBlock = new RawHFileBlock(type, blockBytes);
+        cacheAgent.cacheRawBlock(cacheKey, rawBlock);
       }
     }
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java?rev=1556609&r1=1556608&r2=1556609&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java Wed Jan  8 19:18:24 2014
@@ -772,7 +772,7 @@ public class HFileBlockIndex {
     private BlockCache blockCache;
 
     /** L2Cache, or null if cache-on-write is disabled */
-    private L2Cache l2Cache;
+    private L2CacheAgent cacheAgent;
 
     /** Name to use for computing cache keys */
     private String nameForCaching;
@@ -789,14 +789,15 @@ public class HFileBlockIndex {
      * @param blockWriter the block writer to use to write index blocks
      */
     public BlockIndexWriter(HFileBlock.Writer blockWriter,
-        BlockCache blockCache, L2Cache l2Cache, String nameForCaching) {
-      if (nameForCaching == null && (blockCache != null || l2Cache != null)) {
+        BlockCache blockCache, L2CacheAgent cacheAgent, String nameForCaching) {
+      if ((blockCache != null || cacheAgent != null)  &&
+              nameForCaching == null) {
           throw new IllegalArgumentException("If BlockCache OR L2Cache are  " +
               " not null, then nameForCaching must NOT be null");
       }
       this.blockWriter = blockWriter;
       this.blockCache = blockCache;
-      this.l2Cache = l2Cache;
+      this.cacheAgent = cacheAgent;
       this.nameForCaching = nameForCaching;
       this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
     }
@@ -945,16 +946,18 @@ public class HFileBlockIndex {
       byte[] curFirstKey = curChunk.getBlockKey(0);
       blockWriter.writeHeaderAndData(out);
 
+      HFileBlock blockForCaching = blockWriter.getBlockForCaching();
+      // The block type and SchemaConfigured data matter here, as this will be
+      // used by the cache to update block type specific metrics.
+      BlockCacheKey cacheKey = new BlockCacheKey(nameForCaching, beginOffset);
       if (blockCache != null) {
-        HFileBlock blockForCaching = blockWriter.getBlockForCaching();
         passSchemaMetricsTo(blockForCaching);
-        blockCache.cacheBlock(new BlockCacheKey(nameForCaching, beginOffset),
-                blockForCaching);
+        blockCache.cacheBlock(cacheKey, blockForCaching);
       }
-
-      if (l2Cache != null) {
-        l2Cache.cacheRawBlock(nameForCaching,  beginOffset,
-            blockWriter.getHeaderAndData());
+      if (cacheAgent != null) {
+        RawHFileBlock rawBlock = new RawHFileBlock(
+                blockForCaching.getBlockType(), blockWriter.getHeaderAndData());
+        cacheAgent.cacheRawBlock(cacheKey, rawBlock);
       }
 
       // Add intermediate index block size

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java?rev=1556609&r1=1556608&r2=1556609&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java Wed Jan  8 19:18:24 2014
@@ -24,6 +24,7 @@ import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -234,8 +235,10 @@ public class HFileReaderV1 extends Abstr
           (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey,
               cacheConf.shouldCacheBlockOnRead(effectiveCategory));
         if (cachedBlock != null) {
-          getSchemaMetrics().updateOnCacheHit(effectiveCategory,
-              SchemaMetrics.NO_COMPACTION);
+          getSchemaMetrics().updateOnBlockRead(effectiveCategory,
+                  SchemaMetrics.NO_COMPACTION,
+                  TimeUnit.NANOSECONDS.toMillis(
+                          System.nanoTime() - startTimeNs), true, false, false);
           return cachedBlock.getBufferWithoutHeader();
         }
         // Cache Miss, please load.
@@ -247,11 +250,12 @@ public class HFileReaderV1 extends Abstr
       passSchemaMetricsTo(hfileBlock);
       hfileBlock.expectType(BlockType.META);
 
-      long delta = System.nanoTime() - startTimeNs;
-      HFile.preadTimeNano.addAndGet(delta);
+      long deltaNs = System.nanoTime() - startTimeNs;
+      HFile.preadTimeNano.addAndGet(deltaNs);
       HFile.preadOps.incrementAndGet();
-      HFile.preadHistogram.addValue(delta);
-      getSchemaMetrics().updateOnCacheMiss(BlockCategory.META, false, delta);
+      HFile.preadHistogram.addValue(deltaNs);
+      getSchemaMetrics().updateOnBlockRead(effectiveCategory, false,
+              TimeUnit.NANOSECONDS.toMillis(deltaNs), false, false, false);
 
       // Cache the block
       if (cacheBlock && cacheConf.shouldCacheBlockOnRead(effectiveCategory)) {
@@ -266,8 +270,7 @@ public class HFileReaderV1 extends Abstr
   /**
    * Read in a file block.
    * @param block Index of block to read.
-   * @param pread Use positional read instead of seek+read (positional is
-   * better doing random reads whereas seek+read is better scanning).
+   * @param cacheBlock cache block if read from disk
    * @param isCompaction is this block being read as part of a compaction
    * @return Block wrapped in a ByteBuffer.
    * @throws IOException
@@ -283,6 +286,8 @@ public class HFileReaderV1 extends Abstr
         ", max: " + dataBlockIndexReader.getRootBlockCount());
     }
 
+    long startTimeNs = System.nanoTime();
+
     long offset = dataBlockIndexReader.getRootBlockOffset(block);
     BlockCacheKey cacheKey = new BlockCacheKey(name, offset);
 
@@ -302,15 +307,16 @@ public class HFileReaderV1 extends Abstr
           if (kvContext != null) {
             kvContext.setObtainedFromCache(true);
           }
-          getSchemaMetrics().updateOnCacheHit(
-              cachedBlock.getBlockType().getCategory(), isCompaction);
+          getSchemaMetrics().updateOnBlockRead(
+                  cachedBlock.getBlockType().getCategory(), isCompaction,
+                  TimeUnit.NANOSECONDS.toMillis(
+                          System.nanoTime() - startTimeNs), true, false, false);
           return cachedBlock.getBufferWithoutHeader();
         }
         // Carry on, please load.
       }
 
       // Load block from filesystem.
-      long startTimeNs = System.nanoTime();
       long nextOffset;
 
       if (block == dataBlockIndexReader.getRootBlockCount() - 1) {
@@ -329,18 +335,19 @@ public class HFileReaderV1 extends Abstr
       passSchemaMetricsTo(hfileBlock);
       hfileBlock.expectType(BlockType.DATA);
 
-      long delta = System.nanoTime() - startTimeNs;
+      long deltaNs = System.nanoTime() - startTimeNs;
       if (isCompaction) {
-        HFile.preadCompactionTimeNano.addAndGet(delta);
-        HFile.preadCompactionHistogram.addValue(delta);
+        HFile.preadCompactionTimeNano.addAndGet(deltaNs);
+        HFile.preadCompactionHistogram.addValue(deltaNs);
         HFile.preadCompactionOps.incrementAndGet();
       } else {
-        HFile.preadTimeNano.addAndGet(delta);
-        HFile.preadHistogram.addValue(delta);
+        HFile.preadTimeNano.addAndGet(deltaNs);
+        HFile.preadHistogram.addValue(deltaNs);
         HFile.preadOps.incrementAndGet();
       }
-      getSchemaMetrics().updateOnCacheMiss(BlockCategory.DATA, isCompaction,
-          delta);
+      getSchemaMetrics().updateOnBlockRead(
+              hfileBlock.getBlockType().getCategory(), isCompaction,
+              TimeUnit.NANOSECONDS.toMillis(deltaNs), false, false, false);
       if (kvContext != null) {
         kvContext.setObtainedFromCache(false);
       }
@@ -390,9 +397,9 @@ public class HFileReaderV1 extends Abstr
     if (evictOnClose && cacheConf.isBlockCacheEnabled()) {
       int numEvicted = 0;
       for (int i = 0; i < dataBlockIndexReader.getRootBlockCount(); i++) {
-        if (cacheConf.getBlockCache().evictBlock(
-            new BlockCacheKey(name,
-                dataBlockIndexReader.getRootBlockOffset(i)))) {
+        BlockCacheKey key = new BlockCacheKey(name,
+                dataBlockIndexReader.getRootBlockOffset(i));
+        if (cacheConf.getBlockCache().evictBlock(key)) {
           numEvicted++;
         }
       }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1556609&r1=1556608&r2=1556609&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Wed Jan  8 19:18:24 2014
@@ -88,6 +88,8 @@ public class HFileReaderV2 extends Abstr
    */
   private List<HFileBlock> loadOnOpenBlocks = new ArrayList<HFileBlock>();
 
+  private L2CacheAgent l2Cache;
+
   /**
    * Opens a HFile. You must load the index before you can use it by calling
    * {@link #loadFileInfo()}.
@@ -110,9 +112,13 @@ public class HFileReaderV2 extends Abstr
       throws IOException {
     super(path, trailer, fsdis, size, closeIStream, cacheConf, conf);
     trailer.expectVersion(2);
+    // Get a cache agent and set schema context.
+    l2Cache = cacheConf.getL2CacheAgent();
+    passSchemaMetricsTo(l2Cache);
+
     HFileBlock.FSReaderV2 fsBlockReaderV2 = new HFileBlock.FSReaderV2(fsdis,
         compressAlgo, fileSize,
-        cacheConf.isL2CacheEnabled() ? cacheConf.getL2Cache() : null,
+        cacheConf.isL2CacheEnabled() ? cacheConf.getL2CacheAgent() : null,
         cacheConf.isL2CacheEnabled() ? name : null);
     this.fsBlockReader = fsBlockReaderV2; // upcast
 
@@ -230,20 +236,22 @@ public class HFileReaderV2 extends Abstr
       if (cachedBlock != null) {
         // Return a distinct 'shallow copy' of the block,
         // so pos does not get messed by the scanner
-        getSchemaMetrics().updateOnCacheHit(BlockCategory.META, false);
+        getSchemaMetrics().updateOnBlockRead(BlockCategory.META, false,
+                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs),
+                true, false, false);
         return cachedBlock.getBufferWithoutHeader();
       }
       // Cache Miss, please load.
       HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
-          blockSize, -1, cacheInL2);
+              blockSize, -1, cacheInL2);
       passSchemaMetricsTo(metaBlock);
 
       long deltaNs = System.nanoTime() - startTimeNs;
       HFile.preadTimeNano.addAndGet(deltaNs);
       HFile.preadHistogram.addValue(deltaNs);
       HFile.preadOps.incrementAndGet();
-      getSchemaMetrics().updateOnCacheMiss(BlockCategory.META, false,
-          TimeUnit.NANOSECONDS.toMillis(deltaNs));
+      getSchemaMetrics().updateOnBlockRead(BlockCategory.META, false,
+              TimeUnit.NANOSECONDS.toMillis(deltaNs), false, false, false);
 
       // Cache the block
       if (cacheBlock) {
@@ -322,7 +330,7 @@ public class HFileReaderV2 extends Abstr
       // update schema metrics
       getSchemaMetrics().updateOnBlockRead(
         cachedBlock.getBlockType().getCategory(), isCompaction,
-        System.currentTimeMillis() - startTime, cacheOnPreload, true);
+        System.currentTimeMillis() - startTime, true, false, cacheOnPreload);
       // update profiling data
       Call call = HRegionServer.callContext.get();
       ProfilingData pData = call == null ? null : call.getProfilingData();
@@ -344,14 +352,14 @@ public class HFileReaderV2 extends Abstr
         }
         getSchemaMetrics().updateOnBlockRead(
           cachedBlock.getBlockType().getCategory(), isCompaction,
-          System.currentTimeMillis() - startTime, cacheOnPreload, true);
+          System.currentTimeMillis() - startTime, true, false, cacheOnPreload);
         return cachedBlock;
       }
       // First, check if the block exists in L2 cache
       cachedBlock = null;
       try {
-        cachedBlock = getBlockFromL2Cache(name, dataBlockOffset,
-            expectedBlockType, isCompaction);
+        cachedBlock = getBlockFromL2Cache(cacheKey, expectedBlockType,
+                isCompaction);
       } catch (Throwable t) {
         // If exception is encountered when attempting to read from the L2
         // cache, we should go on to try to read from disk and log the
@@ -376,7 +384,7 @@ public class HFileReaderV2 extends Abstr
         }
         getSchemaMetrics().updateOnBlockRead(
           cachedBlock.getBlockType().getCategory(), isCompaction,
-          System.currentTimeMillis() - startTime, cacheOnPreload, true);
+          System.currentTimeMillis() - startTime, false, true, cacheOnPreload);
         // Return early if a block exists in the L2 cache
         return cachedBlock;
       }
@@ -384,7 +392,7 @@ public class HFileReaderV2 extends Abstr
       long startTimeNs = System.nanoTime();
       HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset,
           onDiskBlockSize, -1, cacheBlock && !isCompaction,
-          getReadOptions(isCompaction));
+              getReadOptions(isCompaction));
       hfileBlock = dataBlockEncoder.diskToCacheFormat(hfileBlock,
           isCompaction);
       validateBlockType(hfileBlock, expectedBlockType);
@@ -407,7 +415,7 @@ public class HFileReaderV2 extends Abstr
       }
       getSchemaMetrics().updateOnBlockRead(
        blockCategory, isCompaction,
-        System.currentTimeMillis() - startTime, cacheOnPreload, false);
+        System.currentTimeMillis() - startTime, false, false, cacheOnPreload);
 
       // Cache the block if necessary
       if (cacheOnPreload
@@ -503,10 +511,7 @@ public class HFileReaderV2 extends Abstr
    * block (i.e., compressed and  encoded byte array) from the L2 cache,
    * de-compress, decode, and then construct an in-memory representation of the
    * block.
-   * @param hfileName Name of the HFile that contains the block (used as part
-   *                  of the cache key)
-   * @param offset Offset in the HFile containing the block (used as another
-   *               part of the cache key)
+   * @param cacheKey the key of the block to be fetched from cache
    * @param expectedBlockType Expected type of the block
    * @param isCompaction Indicates if this is a compaction related read. This
    *                     value is passed along to
@@ -518,20 +523,19 @@ public class HFileReaderV2 extends Abstr
    *         offset in the L2 cache.
    * @throws IOException If we are unable to decompress and decode the block.
    */
-  public HFileBlock getBlockFromL2Cache(String hfileName, long offset,
+  public HFileBlock getBlockFromL2Cache(BlockCacheKey cacheKey,
       BlockType expectedBlockType, boolean isCompaction) throws IOException {
-    if (cacheConf.isL2CacheEnabled()) {
-      byte[] bytes = cacheConf.getL2Cache().getRawBlock(hfileName, offset);
-      if (bytes != null) {
-        HFileBlock hfileBlock = HFileBlock.fromBytes(bytes, compressAlgo,
-            includesMemstoreTS, offset);
-        hfileBlock = dataBlockEncoder.diskToCacheFormat(hfileBlock, isCompaction);
-        validateBlockType(hfileBlock, expectedBlockType);
-        passSchemaMetricsTo(hfileBlock);
-        return hfileBlock;
-      }
+    HFileBlock cachedBlock = null;
+    byte[] bytes = l2Cache.getRawBlockBytes(cacheKey);
+    if (bytes != null) {
+      cachedBlock = HFileBlock.fromBytes(bytes, compressAlgo,
+              includesMemstoreTS, cacheKey.getOffset());
+      cachedBlock = dataBlockEncoder.diskToCacheFormat(cachedBlock,
+              isCompaction);
+      validateBlockType(cachedBlock, expectedBlockType);
+      passSchemaMetricsTo(cachedBlock);
     }
-    return null;
+    return cachedBlock;
   }
 
   /**
@@ -601,13 +605,7 @@ public class HFileReaderV2 extends Abstr
       }
     }
 
-    if (cacheConf.isL2CacheEnabled() && evictL2OnClose) {
-      int numEvicted = cacheConf.getL2Cache().evictBlocksByHfileName(name);
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("On close, file=" + name + " evicted=" + numEvicted
-            + " block(s) from L2 cache");
-      }
-    }
+    l2Cache.evictBlocksByHfileName(name, true);
     if (closeIStream && istream != null) {
       istream.close();
       istream = null;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java?rev=1556609&r1=1556608&r2=1556609&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java Wed Jan  8 19:18:24 2014
@@ -79,6 +79,8 @@ public class HFileWriterV2 extends Abstr
   private final boolean includeMemstoreTS = true;
   private long maxMemstoreTS = 0;
 
+  private L2CacheAgent l2Cache;
+
   static class WriterFactoryV2 extends HFile.WriterFactory {
     WriterFactoryV2(Configuration conf, CacheConfig cacheConf) {
       super(conf, cacheConf);
@@ -103,6 +105,10 @@ public class HFileWriterV2 extends Abstr
     if (fsBlockWriter != null)
       throw new IllegalStateException("finishInit called twice");
 
+    // Get a schema aware L2 cache agent.
+    l2Cache = cacheConf.getL2CacheAgent();
+    passSchemaMetricsTo(l2Cache);
+
     // HFile filesystem-level (non-caching) block writer
     fsBlockWriter = new HFileBlock.Writer(compressAlgo, blockEncoder,
         includeMemstoreTS);
@@ -111,11 +117,11 @@ public class HFileWriterV2 extends Abstr
     boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
     boolean cacheL2IndexesOnWrite = cacheConf.shouldL2CacheDataOnWrite();
     dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
-        cacheIndexesOnWrite ? cacheConf.getBlockCache(): null,
-        cacheL2IndexesOnWrite ? cacheConf.getL2Cache() : null,
+        cacheIndexesOnWrite ? cacheConf.getBlockCache(): null, l2Cache,
         (cacheIndexesOnWrite || cacheL2IndexesOnWrite) ? name : null);
     dataBlockIndexWriter.setMaxChunkSize(
         HFileBlockIndex.getMaxChunkSize(conf));
+    passSchemaMetricsTo(dataBlockIndexWriter);
     inlineBlockWriters.add(dataBlockIndexWriter);
 
     // Meta data block index writer
@@ -163,13 +169,13 @@ public class HFileWriterV2 extends Abstr
     HFile.writeTimeNano.addAndGet(System.nanoTime() - startTimeNs);
     HFile.writeOps.incrementAndGet();
 
-    // If a write is succesfull, cached the written block in the L1 and L2
+    // If a write is successful, cached the written block in the L1 and L2
     // caches
     boolean cacheOnCompaction = cacheCurrentBlockForCompaction();
     if (cacheConf.shouldCacheDataOnFlush() || cacheOnCompaction) {
       doCacheOnWrite(lastDataBlockOffset);
     }
-    if (cacheConf.isL2CacheEnabled() && cacheConf.shouldL2CacheDataOnWrite()) {
+    if (cacheConf.shouldL2CacheDataOnWrite()) {
       doCacheInL2Cache(lastDataBlockOffset);
     }
   }
@@ -213,8 +219,7 @@ public class HFileWriterV2 extends Abstr
         if (cacheThisBlock) {
           doCacheOnWrite(offset);
         }
-        if (cacheConf.isL2CacheEnabled() &&
-            cacheConf.shouldL2CacheDataOnWrite()) {
+        if (cacheConf.shouldL2CacheDataOnWrite()) {
           doCacheInL2Cache(offset);
         }
       }
@@ -230,6 +235,7 @@ public class HFileWriterV2 extends Abstr
     final boolean isCompaction = false;
     HFileBlock cacheFormatBlock = blockEncoder.diskToCacheFormat(
         fsBlockWriter.getBlockForCaching(), isCompaction);
+    BlockCacheKey key = new BlockCacheKey(name, offset);
     passSchemaMetricsTo(cacheFormatBlock);
     cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(name, offset),
             cacheFormatBlock);
@@ -241,8 +247,13 @@ public class HFileWriterV2 extends Abstr
    * @param offset Offset at which the block has been written
    */
   private void doCacheInL2Cache(long offset) throws IOException {
-    cacheConf.getL2Cache().cacheRawBlock(name, offset,
-        fsBlockWriter.getHeaderAndData());
+    if (cacheConf.isL2CacheEnabled()) {
+      BlockCacheKey key = new BlockCacheKey(name, offset);
+      RawHFileBlock rawBlock = new RawHFileBlock(
+              fsBlockWriter.getBlockForCaching().getBlockType(),
+              fsBlockWriter.getHeaderAndData());
+      l2Cache.cacheRawBlock(key, rawBlock);
+    }
   }
 
   /**
@@ -304,7 +315,7 @@ public class HFileWriterV2 extends Abstr
    * Comparator passed on construction.
    *
    * @param kv KeyValue to add. Cannot be empty nor null.
-   * @param appendForCompaction Whether the KV was read from cache or not
+   * @param cv Whether the KV was read from cache or not
    * @throws IOException
    */
   @Override
@@ -339,7 +350,7 @@ public class HFileWriterV2 extends Abstr
    * @param value
    * @param voffset
    * @param vlength
-   * @param KeyValueContext
+   * @param cv
    * @throws IOException
    */
   private void append(final long memstoreTS, final byte[] key,

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCache.java?rev=1556609&r1=1556608&r2=1556609&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCache.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCache.java Wed Jan  8 19:18:24 2014
@@ -38,30 +38,40 @@ public class L2BucketCache implements L2
   }
 
   @Override
-  public byte[] getRawBlock(String hfileName, long dataBlockOffset) {
+  public byte[] getRawBlockBytes(BlockCacheKey key) {
     long startTimeNs = System.nanoTime();
-    BlockCacheKey cacheKey = new BlockCacheKey(hfileName, dataBlockOffset);
-    byte[] fromCache = bucketCache.getBlock(cacheKey, true);
+    byte[] fromCache = bucketCache.getBlock(key, true);
     if (LOG.isTraceEnabled()) {
       // Log elapsed time to retrieve a block from the cache
       long elapsedNs = System.nanoTime() - startTimeNs;
-      LOG.trace("getRawBlock() " + (fromCache == null ?"MISS" : "HIT") +
-          " on hfileName=" + hfileName + ", offset=" + dataBlockOffset +
-          " in " +  elapsedNs + " ns.");
+      LOG.trace("getRawBlock() " + (fromCache == null ? "MISS" : "HIT") +
+          " on hfileName=" + key.getHfileName() +
+          ", offset=" + key.getOffset() + " in " +  elapsedNs + " ns.");
     }
     return fromCache;
   }
 
   @Override
-  public void cacheRawBlock(String hfileName, long dataBlockOffset, byte[] block) {
+  public boolean cacheRawBlock(BlockCacheKey key, RawHFileBlock block) {
     long startTimeNs = System.nanoTime();
-    BlockCacheKey cacheKey = new BlockCacheKey(hfileName, dataBlockOffset);
-    bucketCache.cacheBlock(cacheKey, block);
+    boolean cached = bucketCache.cacheBlock(key, block);
     if (LOG.isTraceEnabled()) {
       long elapsedNs = System.nanoTime() - startTimeNs;
-      LOG.trace("cacheRawBlock() on hfileName=" + hfileName + ", offset=" +
-          dataBlockOffset + " in " + elapsedNs + " ns.");
+      LOG.trace("cacheRawBlock() on hfileName=" + key.getHfileName() +
+          ", offset=" + key.getOffset() + " in " + elapsedNs + " ns.");
     }
+    return cached;
+  }
+
+  @Override
+  public boolean evictRawBlock(BlockCacheKey cacheKey) {
+    long startTimeNs = System.nanoTime();
+    boolean evicted = bucketCache.evictBlock(cacheKey);
+    if (LOG.isTraceEnabled()) {
+      long elapsedNs = System.nanoTime() - startTimeNs;
+      LOG.trace("evictBlock() of " + cacheKey + " in " + elapsedNs + " ns.");
+    }
+    return evicted;
   }
 
   @Override
@@ -77,11 +87,10 @@ public class L2BucketCache implements L2
   }
 
   @Override
-  public boolean isShutdown() {
-    return !bucketCache.isEnabled();
+  public boolean isEnabled() {
+    return bucketCache.isEnabled();
   }
 
-  @Override
   public void shutdown() {
     bucketCache.shutdown();
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCacheFactory.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCacheFactory.java?rev=1556609&r1=1556608&r2=1556609&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCacheFactory.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2BucketCacheFactory.java Wed Jan  8 19:18:24 2014
@@ -64,7 +64,7 @@ public class L2BucketCacheFactory implem
   }
 
   /** Cached instance of the L2Cache or null if not initialized */
-  private L2Cache l2Cache;
+  private L2BucketCache l2Cache;
 
   // Allows to short circuit getL2Cache() calls if the cache is disabled
   private boolean l2CacheDisabled;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2Cache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2Cache.java?rev=1556609&r1=1556608&r2=1556609&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2Cache.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2Cache.java Wed Jan  8 19:18:24 2014
@@ -28,37 +28,44 @@ public interface L2Cache {
   /**
    * Retrieve a block from the L2Cache. The block is retrieved as a byte
    * array, in the same exact format as it is stored on disk.
-   * @param hfileName Filename associated with the block
-   * @param dataBlockOffset Offset in the file
-   * @return
+   * @param cacheKey Key associated with the block
+   * @return raw byte string representing the block if present in cache, null
+   *    otherwise
    */
-  public byte[] getRawBlock(String hfileName, long dataBlockOffset);
+  public byte[] getRawBlockBytes(BlockCacheKey cacheKey);
 
 
   /**
    * Add a block to the L2Cache. The block must be represented by a
    * byte array identical to what would be written to disk.
-   * @param hfileName Filename associated with the block
-   * @param dataBlockOffset Offset in the file
-   * @param rawBlock The exact byte representation of the block
+   * @param cacheKey key associated with the block
+   * @param block the raw HFileBlock to be cached
+   * @return true if the block was cached, false otherwise
    */
-  public void cacheRawBlock(String hfileName, long dataBlockOffset,
-      byte[] rawBlock);
+  public boolean cacheRawBlock(BlockCacheKey cacheKey, RawHFileBlock block);
+
+  /**
+   * Evict a block from the L2Cache.
+   * @param cacheKey Key associated with the block to be evicted
+   * @return true if the block was evicted, false otherwise
+   */
+  public boolean evictRawBlock(BlockCacheKey cacheKey);
 
   /**
    * Evict all blocks matching a given filename. This operation should be
    * efficient and can be called on each close of a store file.
    * @param hfileName Filename whose blocks to evict
+   * @return the number of evicted blocks
    */
   public int evictBlocksByHfileName(String hfileName);
 
   /**
-   * @return true if the cache has been shutdown
+   * @return true if the cache is enabled, false otherwise
    */
-  public boolean isShutdown();
+  public boolean isEnabled();
 
   /**
-   * Shutdown the cache
+   * Shutdown the L2 cache.
    */
   public void shutdown();
 }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2CacheAgent.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2CacheAgent.java?rev=1556609&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2CacheAgent.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/L2CacheAgent.java Wed Jan  8 19:18:24 2014
@@ -0,0 +1,49 @@
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
+
+/**
+ * {@link L2CacheAgent} wraps around a {@link CacheConfig} and {@link L2Cache}
+ * instance and is {@link SchemaConfigured}. As such it will make sure schema
+ * data is added to blocks going into the cache, allowing the appropriate
+ * metrics to be updated throughout the blocks lifecycle. Further more
+ * {@link L2CacheAgent} would allow for making schema aware cache decisions,
+ * removing the need for individual block writers to deal with cache policies.
+ */
+public class L2CacheAgent extends SchemaConfigured {
+  private CacheConfig cacheConfig;
+  private L2Cache l2Cache;
+
+  public L2CacheAgent(final CacheConfig cacheConfig,
+                      final L2Cache l2Cache) {
+    this.cacheConfig = cacheConfig;
+    this.l2Cache = l2Cache;
+  }
+
+  public byte[] getRawBlockBytes(BlockCacheKey cacheKey) {
+    return isL2CacheEnabled() ? l2Cache.getRawBlockBytes(cacheKey) : null;
+  }
+
+  public void cacheRawBlock(BlockCacheKey cacheKey, RawHFileBlock rawBlock) {
+    if (isL2CacheEnabled()) {
+      passSchemaMetricsTo(rawBlock);
+      l2Cache.cacheRawBlock(cacheKey, rawBlock);
+    }
+  }
+
+  public boolean evictRawBlock(BlockCacheKey cacheKey) {
+    return isL2CacheEnabled() && l2Cache.evictRawBlock(cacheKey);
+  }
+
+  public int evictBlocksByHfileName(String hfileName, boolean isClose) {
+    // If the HFile was closed but the policy is not to evict on close, skip
+    // the eviction.
+    if (isClose && !cacheConfig.shouldL2EvictOnClose()) return 0;
+    return isL2CacheEnabled() ?
+            l2Cache.evictBlocksByHfileName(hfileName) : 0;
+  }
+
+  public boolean isL2CacheEnabled() {
+    return l2Cache != null && l2Cache.isEnabled();
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1556609&r1=1556608&r2=1556609&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Wed Jan  8 19:18:24 2014
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.regionserver.LruHashMap;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/RawHFileBlock.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/RawHFileBlock.java?rev=1556609&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/RawHFileBlock.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/RawHFileBlock.java Wed Jan  8 19:18:24 2014
@@ -0,0 +1,43 @@
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
+import org.apache.hadoop.hbase.util.ClassSize;
+
+/**
+ * A representation of a raw {@link HFileBlock}, used to pass a small amount
+ * of metadata around with the raw byte array. This is primarily useful in the
+ * context of making policy decisions and updating metrics when caching raw
+ * block data in L2 and L3 caches.
+ */
+public class RawHFileBlock extends SchemaConfigured implements Cacheable {
+  private final BlockType type;
+  private final byte[] data;
+
+  public static final long RAW_HFILE_BLOCK_OVERHEAD =
+          SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
+          // type and data references.
+          2 * ClassSize.REFERENCE;
+
+  public RawHFileBlock(BlockType type, byte[] data) {
+    this.type = type;
+    this.data = data;
+  }
+
+  public RawHFileBlock(final HFileBlock block) {
+    this(block.getBlockType(), block.getBufferWithHeader().array());
+  }
+
+  @Override
+  public BlockType getBlockType() {
+    return type;
+  }
+
+  public byte[] getData() {
+    return data;
+  }
+
+  @Override
+  public long heapSize() {
+    return ClassSize.align(RAW_HFILE_BLOCK_OVERHEAD);
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java?rev=1556609&r1=1556608&r2=1556609&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java Wed Jan  8 19:18:24 2014
@@ -26,24 +26,15 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.CachedBlock;
-import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
+import org.apache.hadoop.hbase.io.hfile.*;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.hbase.util.IdLock;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
+import org.apache.hadoop.hbase.util.*;
 import org.apache.hadoop.util.StringUtils;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -252,43 +243,47 @@ public class BucketCache implements Heap
   /**
    * Cache the block with the specified name and buffer.
    * @param cacheKey block's cache key
-   * @param buf block buffer
+   * @param block Raw HFile block
+   * @return true if the block was cached, false otherwise
    */
-  public void cacheBlock(BlockCacheKey cacheKey, byte[] buf) {
-    cacheBlock(cacheKey, buf, false);
+  public boolean cacheBlock(BlockCacheKey cacheKey, RawHFileBlock block) {
+    return cacheBlock(cacheKey, block, false);
   }
 
   /**
    * Cache the block with the specified name and buffer.
    * @param cacheKey block's cache key
-   * @param cachedItem block buffer
+   * @param block Raw HFile block
    * @param inMemory if block is in-memory
+   * @return true if the block was cached, false otherwise
    */
-  public void cacheBlock(BlockCacheKey cacheKey, byte[] cachedItem, boolean inMemory) {
-    cacheBlockWithWait(cacheKey, cachedItem, inMemory, wait_when_cache);
+  public boolean cacheBlock(BlockCacheKey cacheKey, RawHFileBlock block,
+                            boolean inMemory) {
+    return cacheBlockWithWait(cacheKey, block, inMemory, wait_when_cache);
   }
 
   /**
    * Cache the block to ramCache
    * @param cacheKey block's cache key
-   * @param cachedItem block buffer
+   * @param block Raw HFile block
    * @param inMemory if block is in-memory
    * @param wait if true, blocking wait when queue is full
+   * @return true if the block was cached, false otherwise
    */
-  public void cacheBlockWithWait(BlockCacheKey cacheKey, byte[] cachedItem,
+  public boolean cacheBlockWithWait(BlockCacheKey cacheKey, RawHFileBlock block,
       boolean inMemory, boolean wait) {
     if (!cacheEnabled)
-      return;
+      return false;
 
     if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey))
-      return;
+      return true;
 
     /*
      * Stuff the entry into the RAM cache so it can get drained to the
      * persistent store
      */
-    RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem,
-        accessCount.incrementAndGet(), inMemory);
+    RAMQueueEntry re = new RAMQueueEntry(cacheKey, block,
+            accessCount.incrementAndGet(), inMemory);
     ramCache.put(cacheKey, re);
     int queueNum = (cacheKey.hashCode() & 0x7FFFFFFF) % writerQueues.size();
     BlockingQueue<RAMQueueEntry> bq = writerQueues.get(queueNum);
@@ -308,12 +303,12 @@ public class BucketCache implements Heap
       failedBlockAdditions.incrementAndGet();
     } else {
       this.blockNumber.incrementAndGet();
-      this.heapSize.addAndGet(cachedItem.length);
+      this.heapSize.addAndGet(re.getRawHFileBlock().getData().length);
       blocksByHFile.put(cacheKey.getHfileName(), cacheKey);
     }
+    return successfulAddition;
   }
 
-
   /**
    * Get the buffer of the block with the specified key.
    * @param key block's cache key
@@ -337,7 +332,7 @@ public class BucketCache implements Heap
     if (re != null) {
       cacheStats.hit(caching);
       re.access(accessCount.incrementAndGet());
-      return re.getData();
+      return re.getRawHFileBlock().getData();
     }
     BucketEntry bucketEntry = backingMap.get(key);
     if(bucketEntry!=null) {
@@ -382,7 +377,8 @@ public class BucketCache implements Heap
     RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
     if (removedBlock != null) {
       this.blockNumber.decrementAndGet();
-      this.heapSize.addAndGet(-1 * removedBlock.getData().length);
+      this.heapSize.addAndGet(
+              -1 * removedBlock.getRawHFileBlock().getData().length);
     }
     BucketEntry bucketEntry = backingMap.get(cacheKey);
     if (bucketEntry == null) { return false; }
@@ -391,7 +387,7 @@ public class BucketCache implements Heap
       lockEntry = offsetLock.getLockEntry(bucketEntry.offset());
       if (bucketEntry.equals(backingMap.remove(cacheKey))) {
         bucketAllocator.freeBlock(bucketEntry.offset());
-        realCacheSize.addAndGet(-1 * bucketEntry.getLength());
+        updateSizeMetrics(bucketEntry, true);
         blocksByHFile.remove(cacheKey.getHfileName(), cacheKey);
         if (removedBlock == null) {
           this.blockNumber.decrementAndGet();
@@ -411,6 +407,17 @@ public class BucketCache implements Heap
     return true;
   }
 
+  protected long updateSizeMetrics(BucketEntry entry, boolean eviction) {
+    long delta = (eviction ? -1 : 1) * entry.getLength();
+    SchemaMetrics metrics = entry.getSchemaMetrics();
+    BlockType type = entry.getBlockType();
+    //LOG.info("updateSizeMetrics: " + type + ", delta: " + delta);
+    if (metrics != null && type != null) {
+      metrics.updateOnL2CachePutOrEvict(type.getCategory(), delta);
+    }
+    return realCacheSize.addAndGet(delta);
+  }
+
   /*
    * Statistics thread.  Periodically prints the cache statistics to the log.
    */
@@ -666,6 +673,25 @@ public class BucketCache implements Heap
       LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
     }
 
+    private BucketEntry writeToCache(final RAMQueueEntry ramEntry)
+            throws CacheFullException, IOException, BucketAllocatorException {
+      int len = ramEntry.getRawHFileBlock().getData().length;
+      if (len == 0) {
+        return null;
+      }
+      long offset = bucketAllocator.allocateBlock(len);
+      BucketEntry bucketEntry = new BucketEntry(offset, ramEntry);
+      try {
+        ioEngine.write(ramEntry.getRawHFileBlock().getData(), offset);
+      } catch (IOException ioe) {
+        // free it in bucket allocator
+        bucketAllocator.freeBlock(offset);
+        throw ioe;
+      }
+      updateSizeMetrics(bucketEntry, false);
+      return bucketEntry;
+    }
+
     /**
      * Flush the entries in ramCache to IOEngine and add bucket entry to
      * backingMap
@@ -686,8 +712,7 @@ public class BucketCache implements Heap
             LOG.warn("Couldn't get the entry from RAM queue, who steals it?");
             continue;
           }
-          BucketEntry bucketEntry = ramEntry.writeToCache(ioEngine,
-              bucketAllocator, realCacheSize);
+          BucketEntry bucketEntry = writeToCache(ramEntry);
           ramEntries[done] = ramEntry;
           bucketEntries[done++] = bucketEntry;
           if (ioErrorStartTime > 0) {
@@ -730,7 +755,8 @@ public class BucketCache implements Heap
         }
         RAMQueueEntry ramCacheEntry = ramCache.remove(ramEntries[i].getKey());
         if (ramCacheEntry != null) {
-          heapSize.addAndGet(-1 * ramEntries[i].getData().length);
+          heapSize.addAndGet(
+                  -1 * ramEntries[i].getRawHFileBlock().getData().length);
         }
       }
 
@@ -864,15 +890,30 @@ public class BucketCache implements Heap
    * up the long. Doubt we'll see devices this big for ages. Offsets are divided
    * by 256. So 5 bytes gives us 256TB or so.
    */
-  static class BucketEntry implements Serializable, Comparable<BucketEntry> {
+  static class BucketEntry implements Serializable, Comparable<BucketEntry>,
+          Cacheable {
     private static final long serialVersionUID = -6741504807982257534L;
     private int offsetBase;
     private int length;
     private byte offset1;
     private volatile long accessTime;
     private CachedBlock.BlockPriority priority;
+    private BlockType type;
+    private SchemaMetrics metrics;
 
-    BucketEntry(long offset, int length, long accessTime, boolean inMemory) {
+    public final static long BUCKET_ENTRY_OVERHEAD =
+            ClassSize.OBJECT +
+            // this.offsetBase, this.length
+            2 * Bytes.SIZEOF_INT +
+            // this.offset1
+            Bytes.SIZEOF_BYTE +
+            // this.accessTime
+            Bytes.SIZEOF_LONG +
+            // this.priority
+            ClassSize.REFERENCE;
+
+    BucketEntry(long offset, int length, long accessTime, boolean inMemory,
+                BlockType type, SchemaMetrics metrics) {
       setOffset(offset);
       this.length = length;
       this.accessTime = accessTime;
@@ -881,6 +922,15 @@ public class BucketCache implements Heap
       } else {
         this.priority = CachedBlock.BlockPriority.SINGLE;
       }
+      this.type = type;
+      this.metrics = metrics;
+    }
+
+    BucketEntry(long offset, RAMQueueEntry ramEntry) {
+      this(offset, ramEntry.getRawHFileBlock().getData().length,
+              ramEntry.getAccessTime(), ramEntry.isInMemory(),
+              ramEntry.getRawHFileBlock().getBlockType(),
+              ramEntry.getRawHFileBlock().getSchemaMetrics());
     }
 
     long offset() { // Java has no unsigned numbers
@@ -924,6 +974,21 @@ public class BucketCache implements Heap
     public boolean equals(Object that) {
       return this == that;
     }
+
+    @Override
+    public long heapSize() {
+      return ClassSize.align(BUCKET_ENTRY_OVERHEAD);
+    }
+
+    @Override
+    public BlockType getBlockType() {
+      return type;
+    }
+
+    @Override
+    public SchemaMetrics getSchemaMetrics() {
+      return metrics;
+    }
   }
 
   /**

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/RAMQueueEntry.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/RAMQueueEntry.java?rev=1556609&r1=1556608&r2=1556609&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/RAMQueueEntry.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/RAMQueueEntry.java Wed Jan  8 19:18:24 2014
@@ -20,60 +20,60 @@
 
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
+import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.io.hfile.RawHFileBlock;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
 
 /**
  * Block Entry stored in the memory with key,data and so on
  */
-class RAMQueueEntry {
+class RAMQueueEntry implements HeapSize {
   private BlockCacheKey key;
-  private byte[] data;
+  private RawHFileBlock block;
   private long accessTime;
   private boolean inMemory;
 
-  public RAMQueueEntry(BlockCacheKey bck, byte[] data, long accessTime,
-      boolean inMemory) {
+  public final static long RAM_QUEUE_ENTRY_OVERHEAD =
+          ClassSize.OBJECT +
+          // key, block
+          2 * ClassSize.REFERENCE +
+          // accessTime
+          Bytes.SIZEOF_LONG +
+          // inMemory
+          Bytes.SIZEOF_BOOLEAN;
+
+  public RAMQueueEntry(BlockCacheKey bck, RawHFileBlock block, long accessTime,
+                       boolean inMemory) {
     this.key = bck;
-    this.data = data;
+    this.block = block;
     this.accessTime = accessTime;
     this.inMemory = inMemory;
   }
 
-  public byte[] getData() {
-    return data;
-  }
-
   public BlockCacheKey getKey() {
     return key;
   }
 
+  public RawHFileBlock getRawHFileBlock() {
+    return block;
+  }
+
+  public long getAccessTime() {
+    return accessTime;
+  }
+
+  public boolean isInMemory() {
+    return inMemory;
+  }
+
   public void access(long accessTime) {
     this.accessTime = accessTime;
   }
 
-  public BucketCache.BucketEntry writeToCache(final IOEngine ioEngine,
-      final BucketAllocator bucketAllocator,
-      final AtomicLong realCacheSize) throws CacheFullException, IOException,
-      BucketAllocatorException {
-    int len = data.length;
-    if (len == 0) {
-      return null;
-    }
-    long offset = bucketAllocator.allocateBlock(len);
-    BucketCache.BucketEntry bucketEntry = new BucketCache.BucketEntry(offset, len, accessTime,
-        inMemory);
-    try {
-      ioEngine.write(data, offset);
-    } catch (IOException ioe) {
-      // free it in bucket allocator
-      bucketAllocator.freeBlock(offset);
-      throw ioe;
-    }
-
-    realCacheSize.addAndGet(len);
-    return bucketEntry;
+  @Override
+  public long heapSize() {
+    return ClassSize.align(RAM_QUEUE_ENTRY_OVERHEAD);
   }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java?rev=1556609&r1=1556608&r2=1556609&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java Wed Jan  8 19:18:24 2014
@@ -151,8 +151,8 @@ public class SchemaConfigured implements
    * current table and column family name, and the associated collection of
    * metrics.
    */
-  public void passSchemaMetricsTo(SchemaConfigured block) {
-    SchemaConfigured upcast = block;  // need this to assign private fields
+  public void passSchemaMetricsTo(SchemaConfigured that) {
+    SchemaConfigured upcast = that;  // need this to assign private fields
     upcast.tableName = tableName;
     upcast.cfName = cfName;
     upcast.schemaMetrics = schemaMetrics;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java?rev=1556609&r1=1556608&r2=1556609&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java Wed Jan  8 19:18:24 2014
@@ -114,20 +114,30 @@ public class SchemaMetrics {
 
   public static enum BlockMetricType {
     READ_TIME("Read", COMPACTION_AWARE_METRIC_FLAG | TIME_VARYING_METRIC_FLAG),
+
     READ_COUNT("BlockReadCnt", COMPACTION_AWARE_METRIC_FLAG),
     CACHE_HIT("BlockReadCacheHitCnt", COMPACTION_AWARE_METRIC_FLAG),
     CACHE_MISS("BlockReadCacheMissCnt", COMPACTION_AWARE_METRIC_FLAG),
 
-    PRELOAD_CACHE_HIT("PreloadCacheHitCnt",COMPACTION_AWARE_METRIC_FLAG),
-    PRELOAD_CACHE_MISS("PreloadCacheMissCnt",COMPACTION_AWARE_METRIC_FLAG),
-    PRELOAD_READ_TIME("PreloadReadTime",
-            COMPACTION_AWARE_METRIC_FLAG | TIME_VARYING_METRIC_FLAG),
-    
     CACHE_SIZE("blockCacheSize", PERSISTENT_METRIC_FLAG),
     UNENCODED_CACHE_SIZE("blockCacheUnencodedSize", PERSISTENT_METRIC_FLAG),
     CACHE_NUM_BLOCKS("cacheNumBlocks", PERSISTENT_METRIC_FLAG),
     CACHED("blockCacheNumCached"),
-    EVICTED("blockCacheNumEvicted");
+    EVICTED("blockCacheNumEvicted"),
+
+    L2_READ_COUNT("L2ReadCnt", COMPACTION_AWARE_METRIC_FLAG),
+    L2_CACHE_HIT("L2CacheHitCnt", COMPACTION_AWARE_METRIC_FLAG),
+    L2_CACHE_MISS("L2CacheMissCnt", COMPACTION_AWARE_METRIC_FLAG),
+
+    L2_CACHE_NUM("L2CacheNumBlocks", PERSISTENT_METRIC_FLAG),
+    L2_CACHE_SIZE("L2CacheSize", PERSISTENT_METRIC_FLAG),
+    L2_CACHED("L2CacheNumCached"),
+    L2_EVICTED("L2CacheNumEvicted"),
+
+    PRELOAD_CACHE_HIT("PreloadCacheHitCnt", COMPACTION_AWARE_METRIC_FLAG),
+    PRELOAD_CACHE_MISS("PreloadCacheMissCnt", COMPACTION_AWARE_METRIC_FLAG),
+    PRELOAD_READ_TIME("PreloadReadTime",
+            COMPACTION_AWARE_METRIC_FLAG | TIME_VARYING_METRIC_FLAG);
 
     private final String metricStr;
     private final int flags;
@@ -250,7 +260,7 @@ public class SchemaMetrics {
       StoreMetricType.values().length;
 
   /** Conf key controlling whether we include table name in metric names */
-  private static final String SHOW_TABLE_NAME_CONF_KEY =
+  public static final String SHOW_TABLE_NAME_CONF_KEY =
       "hbase.metrics.showTableName";
 
   private static final String WORD_BOUNDARY_RE_STR = "\\b";
@@ -522,70 +532,76 @@ public class SchemaMetrics {
    * @param blockCategory category of the block read
    * @param isCompaction whether this is compaction read or not
    * @param timeMs time taken to read the block
+   * @param l1Cached whether this block was read from cache or not
+   * @param l2Cached whether this block was read from L2 cache or not
    * @param preload whether this a preloaded block or not
-   * @param obtainedFromCache whether the block is found in cache or not
    */
   public void updateOnBlockRead(BlockCategory blockCategory,
-      boolean isCompaction, long timeMs, boolean preload,
-      boolean obtainedFromCache) {
-    if (obtainedFromCache) {
-      if (!preload) {
-        updateOnCacheHit(blockCategory, isCompaction, timeMs);
-      } else {
-        updateOnPreloadCacheHit(blockCategory, isCompaction, timeMs);
-      }
+      boolean isCompaction, long timeMs, boolean l1Cached, boolean l2Cached,
+      boolean preload) {
+    addToReadTime(blockCategory, isCompaction, timeMs);
+    if (l1Cached || l2Cached) {
+      if (l1Cached) updateOnCacheHit(blockCategory, isCompaction);
+      if (l2Cached) updateOnL2CacheHit(blockCategory, isCompaction);
+      if (preload) updateOnPreloadCacheHit(blockCategory, isCompaction, timeMs);
     } else {
-      if (!preload) {
-        updateOnCacheMiss(blockCategory, isCompaction, timeMs);
-      } else {
-        updateOnPreloadCacheMiss(blockCategory, isCompaction, timeMs);
-      }
+      updateOnCacheMiss(blockCategory, isCompaction);
+      updateOnL2CacheMiss(blockCategory, isCompaction);
+      if (preload) updateOnPreloadCacheMiss(blockCategory, isCompaction, timeMs);
     }
-  }
 
-  /**
-   * Updates the number of hits and the total number of block reads on a block cache hit.
-   */
-  public void updateOnCacheHit(BlockCategory blockCategory,
-      boolean isCompaction) {
-    blockCategory.expectSpecific();
-    incrNumericMetric(blockCategory, isCompaction, BlockMetricType.CACHE_HIT);
-    incrNumericMetric(blockCategory, isCompaction, BlockMetricType.READ_COUNT);
     if (this != ALL_SCHEMA_METRICS) {
-      ALL_SCHEMA_METRICS.updateOnCacheHit(blockCategory, isCompaction);
+      ALL_SCHEMA_METRICS.updateOnBlockRead(blockCategory, isCompaction, timeMs,
+              l1Cached, l2Cached, preload);
     }
   }
+
   /**
    * Updates the number of hits and the total number of block reads on a block
    * cache hit.
    */
   public void updateOnCacheHit(BlockCategory blockCategory,
-      boolean isCompaction, long deltaMs) {
+      boolean isCompaction) {
     blockCategory.expectSpecific();
     incrNumericMetric(blockCategory, isCompaction, BlockMetricType.CACHE_HIT);
     incrNumericMetric(blockCategory, isCompaction, BlockMetricType.READ_COUNT);
-    addToReadTime(blockCategory, isCompaction, deltaMs);
-    if (this != ALL_SCHEMA_METRICS) {
-      ALL_SCHEMA_METRICS.updateOnCacheHit(blockCategory, isCompaction, deltaMs);
-    }
   }
 
   /**
-   * Updates read time, the number of misses, and the total number of block
-   * reads on a block cache miss.
+   * Updates read time and the number of misses on a block cache miss.
    */
-  public void updateOnCacheMiss(BlockCategory blockCategory,
-      boolean isCompaction, long timeMs) {
+  private void updateOnCacheMiss(BlockCategory blockCategory,
+                                 boolean isCompaction) {
     blockCategory.expectSpecific();
-    addToReadTime(blockCategory, isCompaction, timeMs);
     incrNumericMetric(blockCategory, isCompaction, BlockMetricType.CACHE_MISS);
     incrNumericMetric(blockCategory, isCompaction, BlockMetricType.READ_COUNT);
-    if (this != ALL_SCHEMA_METRICS) {
-      ALL_SCHEMA_METRICS.updateOnCacheMiss(blockCategory, isCompaction,
-          timeMs);
-    }
   }
-  
+
+  /**
+   * Updates the number of hits and the total number of block reads on a L2
+   * cache hit.
+   */
+  private void updateOnL2CacheHit(BlockCategory blockCategory,
+                                 boolean isCompaction) {
+    blockCategory.expectSpecific();
+    incrNumericMetric(blockCategory, isCompaction,
+            BlockMetricType.L2_READ_COUNT);
+    incrNumericMetric(blockCategory, isCompaction,
+            BlockMetricType.L2_CACHE_HIT);
+  }
+
+  /**
+   * Updates read time and the number of misses on a block cache miss.
+   */
+  private void updateOnL2CacheMiss(BlockCategory blockCategory,
+                                boolean isCompaction) {
+    blockCategory.expectSpecific();
+    incrNumericMetric(blockCategory, isCompaction,
+            BlockMetricType.L2_CACHE_MISS);
+    incrNumericMetric(blockCategory, isCompaction,
+            BlockMetricType.L2_READ_COUNT);
+  }
+
   private void addToPreloadReadTime(BlockCategory blockCategory,
       boolean isCompaction, long timeMs) {
     HRegion.incrTimeVaryingMetric(getBlockMetricName(blockCategory,
@@ -600,29 +616,22 @@ public class SchemaMetrics {
   /**
    * Updates read time, the number of misses, and the total number of block for preloader
    */
-  public void updateOnPreloadCacheMiss(BlockCategory blockCategory, boolean isCompaction,
+  private void updateOnPreloadCacheMiss(BlockCategory blockCategory, boolean isCompaction,
       long timeMs) {
     blockCategory.expectSpecific();
     addToPreloadReadTime(blockCategory, isCompaction, timeMs);
     incrNumericMetric(blockCategory, isCompaction, BlockMetricType.PRELOAD_CACHE_MISS);
-    if (this != ALL_SCHEMA_METRICS) {
-      ALL_SCHEMA_METRICS.updateOnPreloadCacheMiss(blockCategory, isCompaction, timeMs);
-    }
   }
 
   /**
    * Updates read time, the number of hits, and the total number of block for preloader
    */
-  public void updateOnPreloadCacheHit(BlockCategory blockCategory,
+  private void updateOnPreloadCacheHit(BlockCategory blockCategory,
       boolean isCompaction, long timeMs) {
     blockCategory.expectSpecific();
     addToPreloadReadTime(blockCategory, isCompaction, timeMs);
     incrNumericMetric(blockCategory, isCompaction,
-      BlockMetricType.PRELOAD_CACHE_HIT);
-    if (this != ALL_SCHEMA_METRICS) {
-      ALL_SCHEMA_METRICS.updateOnPreloadCacheMiss(blockCategory, isCompaction,
-        timeMs);
-    }
+            BlockMetricType.PRELOAD_CACHE_HIT);
   }
   
   /**
@@ -649,6 +658,21 @@ public class SchemaMetrics {
     }
   }
 
+  private void updateL2CacheSize(BlockCategory category, long delta) {
+    if (category == null) {
+      category = BlockCategory.ALL_CATEGORIES;
+    }
+    HRegion.incrNumericPersistentMetric(getBlockMetricName(category,
+            DEFAULT_COMPACTION_FLAG, BlockMetricType.L2_CACHE_NUM),
+            delta > 0 ? 1 : -1);
+    HRegion.incrNumericPersistentMetric(getBlockMetricName(category,
+            DEFAULT_COMPACTION_FLAG, BlockMetricType.L2_CACHE_SIZE), delta);
+
+    if (category != BlockCategory.ALL_CATEGORIES) {
+      updateL2CacheSize(BlockCategory.ALL_CATEGORIES, delta);
+    }
+  }
+
   /**
    * Updates the number and the total size of blocks in cache for both the configured table/CF
    * and all table/CFs (by calling the same method on {@link #ALL_SCHEMA_METRICS}), both the given
@@ -670,6 +694,16 @@ public class SchemaMetrics {
     }
   }
 
+  public void updateOnL2CachePutOrEvict(BlockCategory blockCategory,
+                                        long delta) {
+    updateL2CacheSize(blockCategory, delta);
+    incrNumericMetric(blockCategory, DEFAULT_COMPACTION_FLAG,
+            delta > 0 ? BlockMetricType.L2_CACHED : BlockMetricType.L2_EVICTED);
+    if (this != ALL_SCHEMA_METRICS) {
+      ALL_SCHEMA_METRICS.updateOnL2CachePutOrEvict(blockCategory, delta);
+    }
+  }
+
   /**
    * Increments both the per-CF and the aggregate counter of bloom
    * positives/negatives as specified by the argument.

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java?rev=1556609&r1=1556608&r2=1556609&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestL2BucketCache.java Wed Jan  8 19:18:24 2014
@@ -21,17 +21,18 @@ package org.apache.hadoop.hbase.io.hfile
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.ClientConfigurationUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.regionserver.CreateRandomStoreFile;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaConfigured;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
 import org.junit.After;
 import org.junit.Before;
@@ -40,9 +41,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Random;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.*;
@@ -72,6 +71,8 @@ public class TestL2BucketCache {
 
   private Configuration conf;
   private CacheConfig cacheConf;
+  private HColumnDescriptor family;
+  private HRegion region;
   private FileSystem fs;
   private Path storeFilePath;
 
@@ -115,7 +116,7 @@ public class TestL2BucketCache {
   }
 
   @After
-  public void tearDown() {
+  public void tearDown() throws IOException {
     underlyingCache.shutdown();
   }
 
@@ -145,8 +146,8 @@ public class TestL2BucketCache {
           new BlockCacheKey(reader.getName(), offset), true) != null;
       if (isInL1Lcache) {
         cachedCount++;
-        byte[] blockFromCacheRaw =
-            mockedL2Cache.getRawBlock(reader.getName(), offset);
+        BlockCacheKey key = new BlockCacheKey(reader.getName(), offset);
+        byte[] blockFromCacheRaw = mockedL2Cache.getRawBlockBytes(key);
         assertNotNull("All blocks in l1 cache, should also be in l2 cache: "
             + blockFromDisk.toString(), blockFromCacheRaw);
         HFileBlock blockFromL2Cache = HFileBlock.fromBytes(blockFromCacheRaw,
@@ -176,7 +177,8 @@ public class TestL2BucketCache {
     while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
       HFileBlock blockFromDisk = reader.readBlock(offset, -1, true, false,
               false, null, encodingInCache, null);
-      assertNotNull(mockedL2Cache.getRawBlock(reader.getName(), offset));
+      BlockCacheKey key = new BlockCacheKey(reader.getName(), offset);
+      assertNotNull(mockedL2Cache.getRawBlockBytes(key));
       cacheConf.getBlockCache().evictBlock(new BlockCacheKey(reader.getName(),
               offset));
       HFileBlock blockFromL2Cache = reader.readBlock(offset, -1, true, false,
@@ -227,12 +229,78 @@ public class TestL2BucketCache {
     assertFalse(cacheConf.isL2CacheEnabled());
   }
 
+  @Test
+  public void shouldUpdatePerBlockCategoryMetrics() throws Exception {
+    Map<String, Long> snapshot = SchemaMetrics.getMetricsSnapshot();
+    writeStoreFile();
+
+    // Get an unknown table and CF.
+    SchemaConfigured unknownSchema = new SchemaConfigured(conf, null);
+    // Get the test table and CF.
+    SchemaConfigured testSchema = new SchemaConfigured(conf, storeFilePath);
+
+    assertEquals(
+            "Unknown schema DATA category size in L2 cache should be zero",
+            0, getL2CacheSize(unknownSchema, BlockType.BlockCategory.DATA));
+    assertTrue(
+            "Test schema DATA category size in L2 cache should not be zero",
+            getL2CacheSize(testSchema, BlockType.BlockCategory.DATA) > 0);
+    // Validate that the per-cf-category metrics add up with the all-cf-category
+    // metrics.
+    SchemaMetrics.validateMetricChanges(snapshot);
+
+    readAndEvictBlocksFromL2Cache();
+
+    assertTrue("Test schema should have L2 cache hits",
+            getL2CacheHitCount(testSchema, BlockType.BlockCategory.DATA) > 0);
+    assertEquals("L2 cache should be empty", 0,
+            underlyingCache.getBlockCount());
+    assertEquals("DATA category size in L2 cache should be zero", 0,
+            getL2CacheSize(testSchema, BlockType.BlockCategory.DATA));
+    SchemaMetrics.validateMetricChanges(snapshot);
+  }
+
+  private void readAndEvictBlocksFromL2Cache() throws IOException {
+    long offset = 0;
+    HFileReaderV2 reader = (HFileReaderV2) HFile.createReaderWithEncoding(fs,
+            storeFilePath, cacheConf, ENCODER.getEncodingInCache());
+    // Clear the BlockCache to make sure reads are satisfied from L2 cache or
+    // disk.
+    cacheConf.getBlockCache().clearCache();
+    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
+      HFileBlock block = reader.readBlock(offset, -1, true, false,
+              false, null, reader.getEffectiveEncodingInCache(false), null);
+      mockedL2Cache.evictRawBlock(new BlockCacheKey(reader.getName(), offset));
+      offset += block.getOnDiskSizeWithHeader();
+    }
+  }
+
+  private long getL2CacheSize(SchemaConfigured schema,
+                              BlockType.BlockCategory category) {
+    return HRegion.getNumericPersistentMetric(
+            schema.getSchemaMetrics().getBlockMetricName(category, false,
+                    SchemaMetrics.BlockMetricType.L2_CACHE_SIZE));
+  }
+
+  private long getL2CacheHitCount(SchemaConfigured schema,
+                                  BlockType.BlockCategory category) {
+    return HRegion.getNumericMetric(
+            schema.getSchemaMetrics().getBlockMetricName(category, false,
+                    SchemaMetrics.BlockMetricType.L2_CACHE_HIT));
+  }
+
   private void writeStoreFile() throws IOException {
-    Path storeFileParentDir = new Path(TEST_UTIL.getTestDir(),
-        "test_cache_on_write");
+    // Generate a random family name in order to keep different tests apart.
+    // This is important for the metrics test.
+    family = new HColumnDescriptor(
+            Integer.toHexString(new Random().nextInt(1023)));
+    region = TEST_UTIL.createTestRegion(TestL2BucketCache.class.getSimpleName(),
+            family);
+    Path storeHomeDir = Store.getStoreHomedir(region.getRegionDir(),
+            region.getRegionInfo().getEncodedName(), family.getName());
     StoreFile.Writer sfw = new StoreFile.WriterBuilder(conf, cacheConf, fs,
         DATA_BLOCK_SIZE)
-        .withOutputDir(storeFileParentDir)
+        .withOutputDir(storeHomeDir)
         .withCompression(Compression.Algorithm.GZ)
         .withDataBlockEncoder(ENCODER)
         .withComparator(KeyValue.COMPARATOR)
@@ -271,26 +339,35 @@ public class TestL2BucketCache {
     }
 
     @Override
-    public byte[] getRawBlock(String hfileName, long dataBlockOffset) {
+    public byte[] getRawBlockBytes(BlockCacheKey key) {
       byte[] ret = null;
       if (enableReads.get()) {
-        ret = underlying.getRawBlock(hfileName, dataBlockOffset);
+        ret = underlying.getRawBlockBytes(key);
         if (LOG.isTraceEnabled()) {
           LOG.trace("Cache " + (ret == null ?"miss":"hit")  +
-              " for hfileName=" + hfileName + ", offset=" + dataBlockOffset);
+              " for hfileName=" + key.getHfileName() +
+              ", offset=" + key.getOffset());
         }
       }
       return ret;
     }
 
     @Override
-    public void cacheRawBlock(String hfileName, long dataBlockOffset,
-        byte[] rawBlock) {
+    public boolean cacheRawBlock(BlockCacheKey cacheKey,
+                                 RawHFileBlock rawBlock) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Caching " + rawBlock.getData().length + " bytes, hfileName=" +
+            cacheKey.getHfileName() + ", offset=" + cacheKey.getOffset());
+      }
+      return underlying.cacheRawBlock(cacheKey, rawBlock);
+    }
+
+    @Override
+    public boolean evictRawBlock(BlockCacheKey cacheKey) {
       if (LOG.isTraceEnabled()) {
-        LOG.trace("Caching " + rawBlock.length + " bytes, hfileName=" +
-            hfileName + ", offset=" + dataBlockOffset);
+        LOG.trace("Evicting " + cacheKey);
       }
-      underlying.cacheRawBlock(hfileName, dataBlockOffset, rawBlock);
+      return underlying.evictRawBlock(cacheKey);
     }
 
     @Override
@@ -299,8 +376,8 @@ public class TestL2BucketCache {
     }
 
     @Override
-    public boolean isShutdown() {
-      return underlying.isShutdown();
+    public boolean isEnabled() {
+      return underlying.isEnabled();
     }
 
     @Override

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java?rev=1556609&r1=1556608&r2=1556609&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java Wed Jan  8 19:18:24 2014
@@ -25,7 +25,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.MultithreadedTestUtil;
 
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.RawHFileBlock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -135,7 +137,8 @@ public class TestBucketCache {
 
     // Add blocks
     for (BlockOnDisk block : blocks) {
-      cache.cacheBlock(block.blockName, block.block);
+      cache.cacheBlock(block.blockName,
+              new RawHFileBlock(BlockType.DATA, block.block));
     }
 
     // Check if all blocks are properly cached and contain the right
@@ -155,7 +158,8 @@ public class TestBucketCache {
     for (BlockOnDisk block : blocks) {
       try {
         if (cache.getBlock(block.blockName, true) != null) {
-          cache.cacheBlock(block.blockName, block.block);
+          cache.cacheBlock(block.blockName,
+                  new RawHFileBlock(BlockType.DATA, block.block));
         }
       } catch (RuntimeException re) {
         // expected
@@ -174,7 +178,7 @@ public class TestBucketCache {
         conf);
 
     final AtomicInteger totalQueries = new AtomicInteger();
-    cache.cacheBlock(key, buf);
+    cache.cacheBlock(key, new RawHFileBlock(BlockType.DATA, buf));
 
     for (int i = 0; i < NUM_THREADS; i++) {
       MultithreadedTestUtil.TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
@@ -202,7 +206,8 @@ public class TestBucketCache {
     cache.stopWriterThreads();
     BlockOnDisk[] blocks = generateDiskBlocks(BLOCK_SIZE, 1);
     long heapSize = cache.heapSize();
-    cache.cacheBlock(blocks[0].blockName, blocks[0].block);
+    cache.cacheBlock(blocks[0].blockName,
+            new RawHFileBlock(BlockType.DATA, blocks[0].block));
 
     /*When we cache something HeapSize should always increase */
     assertTrue(heapSize < cache.heapSize());
@@ -254,20 +259,20 @@ public class TestBucketCache {
     }
 
     @Override
-    public void cacheBlock(BlockCacheKey cacheKey, byte[] buf,
+    public boolean cacheBlock(BlockCacheKey cacheKey, RawHFileBlock block,
         boolean inMemory) {
       if (super.getBlock(cacheKey, true, false) != null) {
         throw new RuntimeException("Cached an already cached block");
       }
-      super.cacheBlock(cacheKey, buf, inMemory);
+      return super.cacheBlock(cacheKey, block, inMemory);
     }
 
     @Override
-    public void cacheBlock(BlockCacheKey cacheKey, byte[] buf) {
+    public boolean cacheBlock(BlockCacheKey cacheKey, RawHFileBlock block) {
       if (super.getBlock(cacheKey, true, false) != null) {
         throw new RuntimeException("Cached an already cached block");
       }
-      super.cacheBlock(cacheKey, buf);
+      return super.cacheBlock(cacheKey, block);
     }
   }
 }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestSchemaMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestSchemaMetrics.java?rev=1556609&r1=1556608&r2=1556609&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestSchemaMetrics.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/metrics/TestSchemaMetrics.java Wed Jan  8 19:18:24 2014
@@ -252,9 +252,10 @@ public class TestSchemaMetrics {
           }
 
           for (boolean isCompaction : BOOL_VALUES) {
-            sm.updateOnCacheHit(blockCat, isCompaction);
+            sm.updateOnBlockRead(blockCat, isCompaction, 0, true, false, false);
             checkMetrics();
-            sm.updateOnCacheMiss(blockCat, isCompaction, rand.nextInt(READ_TIME_MS_RANGE));
+            sm.updateOnBlockRead(blockCat, isCompaction,
+                    rand.nextInt(READ_TIME_MS_RANGE), false, false, false);
             checkMetrics();
           }