You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/12/31 12:45:51 UTC

[15/47] hbase git commit: HBASE-21514 Refactor CacheConfig

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 6242d36..13f277b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -36,6 +36,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.Timer;
@@ -98,7 +99,7 @@ import org.apache.hadoop.hbase.executor.ExecutorType;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.http.InfoServer;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
@@ -114,7 +115,7 @@ import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.LoadBalancer;
 import org.apache.hadoop.hbase.master.RegionState.State;
-import org.apache.hadoop.hbase.mob.MobCacheConfig;
+import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
 import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
 import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore;
@@ -410,10 +411,10 @@ public class HRegionServer extends HasThread implements
 
   private final RegionServerAccounting regionServerAccounting;
 
-  // Cache configuration and block cache reference
-  protected CacheConfig cacheConfig;
-  // Cache configuration for mob
-  final MobCacheConfig mobCacheConfig;
+  // Block cache
+  private BlockCache blockCache;
+  // The cache for mob files
+  private MobFileCache mobFileCache;
 
   /** The health check chore. */
   private HealthCheckChore healthCheckChore;
@@ -591,12 +592,12 @@ public class HRegionServer extends HasThread implements
 
       boolean isMasterNotCarryTable =
           this instanceof HMaster && !LoadBalancer.isTablesOnMaster(conf);
-      // no need to instantiate global block cache when master not carry table
+
+      // no need to instantiate block cache and mob file cache when master not carry table
       if (!isMasterNotCarryTable) {
-        CacheConfig.instantiateBlockCache(conf);
+        blockCache = BlockCacheFactory.createBlockCache(conf);
+        mobFileCache = new MobFileCache(conf);
       }
-      cacheConfig = new CacheConfig(conf);
-      mobCacheConfig = new MobCacheConfig(conf);
 
       uncaughtExceptionHandler = new UncaughtExceptionHandler() {
         @Override
@@ -1062,10 +1063,12 @@ public class HRegionServer extends HasThread implements
       }
     }
     // Send cache a shutdown.
-    if (cacheConfig != null && cacheConfig.isBlockCacheEnabled()) {
-      cacheConfig.getBlockCache().shutdown();
+    if (blockCache != null) {
+      blockCache.shutdown();
+    }
+    if (mobFileCache != null) {
+      mobFileCache.shutdown();
     }
-    mobCacheConfig.getMobFileCache().shutdown();
 
     if (movedRegionsCleaner != null) {
       movedRegionsCleaner.stop("Region Server stopping");
@@ -1607,9 +1610,9 @@ public class HRegionServer extends HasThread implements
   }
 
   private void startHeapMemoryManager() {
-    this.hMemManager = HeapMemoryManager.create(this.conf, this.cacheFlusher, this,
-        this.regionServerAccounting);
-    if (this.hMemManager != null) {
+    if (this.blockCache != null) {
+      this.hMemManager =
+          new HeapMemoryManager(this.blockCache, this.cacheFlusher, this, regionServerAccounting);
       this.hMemManager.start(getChoreService());
     }
   }
@@ -3614,10 +3617,23 @@ public class HRegionServer extends HasThread implements
   }
 
   /**
-   * @return The cache config instance used by the regionserver.
+   * May be null if this is a master which not carry table.
+   *
+   * @return The block cache instance used by the regionserver.
+   */
+  @Override
+  public Optional<BlockCache> getBlockCache() {
+    return Optional.ofNullable(this.blockCache);
+  }
+
+  /**
+   * May be null if this is a master which not carry table.
+   *
+   * @return The cache for mob files used by the regionserver.
    */
-  public CacheConfig getCacheConfig() {
-    return this.cacheConfig;
+  @Override
+  public Optional<MobFileCache> getMobFileCache() {
+    return Optional.ofNullable(this.mobFileCache);
   }
 
   /**
@@ -3646,7 +3662,6 @@ public class HRegionServer extends HasThread implements
   }
 
   public CacheEvictionStats clearRegionBlockCache(Region region) {
-    BlockCache blockCache = this.getCacheConfig().getBlockCache();
     long evictedBlocks = 0;
 
     for(Store store : region.getStores()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 032dc5f..b3e5b97 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -377,7 +377,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
    * @param family The current column family.
    */
   protected void createCacheConf(final ColumnFamilyDescriptor family) {
-    this.cacheConf = new CacheConfig(conf, family);
+    this.cacheConf = new CacheConfig(conf, family, region.getBlockCache());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
index c32fce2..a96417d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
@@ -30,13 +30,14 @@ import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ScheduledChore;
 import org.apache.hadoop.hbase.Server;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
 import org.apache.hadoop.hbase.io.hfile.ResizableBlockCache;
 import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
@@ -105,20 +106,11 @@ public class HeapMemoryManager {
 
   private List<HeapMemoryTuneObserver> tuneObservers = new ArrayList<>();
 
-  public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher,
-      Server server, RegionServerAccounting regionServerAccounting) {
-    ResizableBlockCache lruCache = CacheConfig.getOnHeapCache(conf);
-    if (lruCache != null) {
-      return new HeapMemoryManager(lruCache, memStoreFlusher, server, regionServerAccounting);
-    }
-    return null;
-  }
-
   @VisibleForTesting
-  HeapMemoryManager(ResizableBlockCache blockCache, FlushRequester memStoreFlusher,
+  HeapMemoryManager(BlockCache blockCache, FlushRequester memStoreFlusher,
                 Server server, RegionServerAccounting regionServerAccounting) {
     Configuration conf = server.getConfiguration();
-    this.blockCache = blockCache;
+    this.blockCache = toResizableBlockCache(blockCache);
     this.memStoreFlusher = memStoreFlusher;
     this.server = server;
     this.regionServerAccounting = regionServerAccounting;
@@ -130,6 +122,14 @@ public class HeapMemoryManager {
     metricsHeapMemoryManager = new MetricsHeapMemoryManager();
   }
 
+  private ResizableBlockCache toResizableBlockCache(BlockCache blockCache) {
+    if (blockCache instanceof CombinedBlockCache) {
+      return (ResizableBlockCache) ((CombinedBlockCache) blockCache).getOnHeapCache();
+    } else {
+      return (ResizableBlockCache) blockCache;
+    }
+  }
+
   private boolean doInit(Configuration conf) {
     boolean tuningEnabled = true;
     globalMemStorePercent = MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false);

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index b38c3e0..33a6ee0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 import java.util.OptionalDouble;
 import java.util.OptionalLong;
 import java.util.concurrent.ScheduledExecutorService;
@@ -32,9 +33,8 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.CacheStats;
-import org.apache.hadoop.hbase.mob.MobCacheConfig;
+import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
 import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -59,8 +59,11 @@ class MetricsRegionServerWrapperImpl
   private final HRegionServer regionServer;
   private final MetricsWALSource metricsWALSource;
 
-  private BlockCache blockCache;
-  private MobFileCache mobFileCache;
+  private Optional<BlockCache> blockCache;
+  private Optional<MobFileCache> mobFileCache;
+  private Optional<CacheStats> cacheStats;
+  private Optional<CacheStats> l1Stats = Optional.empty();
+  private Optional<CacheStats> l2Stats = Optional.empty();
 
   private volatile long numStores = 0;
   private volatile long numWALFiles = 0;
@@ -112,9 +115,6 @@ class MetricsRegionServerWrapperImpl
   private volatile long blockedRequestsCount = 0L;
   private volatile long averageRegionSize = 0L;
 
-  private CacheStats cacheStats;
-  private CacheStats l1Stats = null;
-  private CacheStats l2Stats = null;
   private ScheduledExecutorService executor;
   private Runnable runnable;
   private long period;
@@ -149,34 +149,26 @@ class MetricsRegionServerWrapperImpl
     }
   }
 
-  /**
-   * It's possible that due to threading the block cache could not be initialized
-   * yet (testing multiple region servers in one jvm).  So we need to try and initialize
-   * the blockCache and cacheStats reference multiple times until we succeed.
-   */
-  private synchronized  void initBlockCache() {
-    CacheConfig cacheConfig = this.regionServer.cacheConfig;
-    if (cacheConfig != null) {
-      l1Stats = cacheConfig.getOnHeapCacheStats();
-      l2Stats = cacheConfig.getL2CacheStats();
-      if (this.blockCache == null) {
-        this.blockCache = cacheConfig.getBlockCache();
+  private void initBlockCache() {
+    this.blockCache = this.regionServer.getBlockCache();
+    this.cacheStats = this.blockCache.map(BlockCache::getStats);
+    if (this.cacheStats.isPresent()) {
+      if (this.cacheStats.get() instanceof CombinedBlockCache.CombinedCacheStats) {
+        l1Stats = Optional
+            .of(((CombinedBlockCache.CombinedCacheStats) this.cacheStats.get()).getLruCacheStats());
+        l2Stats = Optional.of(((CombinedBlockCache.CombinedCacheStats) this.cacheStats.get())
+            .getBucketCacheStats());
+      } else {
+        l1Stats = this.cacheStats;
       }
     }
-
-    if (this.blockCache != null && this.cacheStats == null) {
-      this.cacheStats = blockCache.getStats();
-    }
   }
 
   /**
    * Initializes the mob file cache.
    */
-  private synchronized void initMobFileCache() {
-    MobCacheConfig mobCacheConfig = this.regionServer.mobCacheConfig;
-    if (mobCacheConfig != null && this.mobFileCache == null) {
-      this.mobFileCache = mobCacheConfig.getMobFileCache();
-    }
+  private void initMobFileCache() {
+    this.mobFileCache = this.regionServer.getMobFileCache();
   }
 
   @Override
@@ -281,10 +273,7 @@ class MetricsRegionServerWrapperImpl
 
   @Override
   public long getBlockCacheCount() {
-    if (this.blockCache == null) {
-      return 0;
-    }
-    return this.blockCache.getBlockCount();
+    return this.blockCache.map(BlockCache::getBlockCount).orElse(0L);
   }
 
   @Override
@@ -294,74 +283,47 @@ class MetricsRegionServerWrapperImpl
 
   @Override
   public long getBlockCacheSize() {
-    if (this.blockCache == null) {
-      return 0;
-    }
-    return this.blockCache.getCurrentSize();
+    return this.blockCache.map(BlockCache::getCurrentSize).orElse(0L);
   }
 
   @Override
   public long getBlockCacheFreeSize() {
-    if (this.blockCache == null) {
-      return 0;
-    }
-    return this.blockCache.getFreeSize();
+    return this.blockCache.map(BlockCache::getFreeSize).orElse(0L);
   }
 
   @Override
   public long getBlockCacheHitCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return this.cacheStats.getHitCount();
+    return this.cacheStats.map(CacheStats::getHitCount).orElse(0L);
   }
 
   @Override
   public long getBlockCachePrimaryHitCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return this.cacheStats.getPrimaryHitCount();
+    return this.cacheStats.map(CacheStats::getPrimaryHitCount).orElse(0L);
   }
 
   @Override
   public long getBlockCacheMissCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return this.cacheStats.getMissCount();
+    return this.cacheStats.map(CacheStats::getMissCount).orElse(0L);
   }
 
   @Override
   public long getBlockCachePrimaryMissCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return this.cacheStats.getPrimaryMissCount();
+    return this.cacheStats.map(CacheStats::getPrimaryMissCount).orElse(0L);
   }
 
   @Override
   public long getBlockCacheEvictedCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return this.cacheStats.getEvictedCount();
+    return this.cacheStats.map(CacheStats::getEvictedCount).orElse(0L);
   }
 
   @Override
   public long getBlockCachePrimaryEvictedCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return this.cacheStats.getPrimaryEvictedCount();
+    return this.cacheStats.map(CacheStats::getPrimaryEvictedCount).orElse(0L);
   }
 
   @Override
   public double getBlockCacheHitPercent() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    double ratio = this.cacheStats.getHitRatio();
+    double ratio = this.cacheStats.map(CacheStats::getHitRatio).orElse(0.0);
     if (Double.isNaN(ratio)) {
       ratio = 0;
     }
@@ -370,12 +332,7 @@ class MetricsRegionServerWrapperImpl
 
   @Override
   public double getBlockCacheHitCachingPercent() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-
-    double ratio = this.cacheStats.getHitCachingRatio();
-
+    double ratio = this.cacheStats.map(CacheStats::getHitCachingRatio).orElse(0.0);
     if (Double.isNaN(ratio)) {
       ratio = 0;
     }
@@ -384,74 +341,47 @@ class MetricsRegionServerWrapperImpl
 
   @Override
   public long getBlockCacheFailedInsertions() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return this.cacheStats.getFailedInserts();
+    return this.cacheStats.map(CacheStats::getFailedInserts).orElse(0L);
   }
 
   @Override
   public long getL1CacheHitCount() {
-    if (this.l1Stats == null) {
-      return 0;
-    }
-    return this.l1Stats.getHitCount();
+    return this.l1Stats.map(CacheStats::getHitCount).orElse(0L);
   }
 
   @Override
   public long getL1CacheMissCount() {
-    if (this.l1Stats == null) {
-      return 0;
-    }
-    return this.l1Stats.getMissCount();
+    return this.l1Stats.map(CacheStats::getMissCount).orElse(0L);
   }
 
   @Override
   public double getL1CacheHitRatio() {
-    if (this.l1Stats == null) {
-      return 0;
-    }
-    return this.l1Stats.getHitRatio();
+    return this.l1Stats.map(CacheStats::getHitRatio).orElse(0.0);
   }
 
   @Override
   public double getL1CacheMissRatio() {
-    if (this.l1Stats == null) {
-      return 0;
-    }
-    return this.l1Stats.getMissRatio();
+    return this.l1Stats.map(CacheStats::getMissRatio).orElse(0.0);
   }
 
   @Override
   public long getL2CacheHitCount() {
-    if (this.l2Stats == null) {
-      return 0;
-    }
-    return this.l2Stats.getHitCount();
+    return this.l2Stats.map(CacheStats::getHitCount).orElse(0L);
   }
 
   @Override
   public long getL2CacheMissCount() {
-    if (this.l2Stats == null) {
-      return 0;
-    }
-    return this.l2Stats.getMissCount();
+    return this.l2Stats.map(CacheStats::getMissCount).orElse(0L);
   }
 
   @Override
   public double getL2CacheHitRatio() {
-    if (this.l2Stats == null) {
-      return 0;
-    }
-    return this.l2Stats.getHitRatio();
+    return this.l2Stats.map(CacheStats::getHitRatio).orElse(0.0);
   }
 
   @Override
   public double getL2CacheMissRatio() {
-    if (this.l2Stats == null) {
-      return 0;
-    }
-    return this.l2Stats.getMissRatio();
+    return this.l2Stats.map(CacheStats::getMissRatio).orElse(0.0);
   }
 
   @Override public void forceRecompute() {
@@ -741,9 +671,6 @@ class MetricsRegionServerWrapperImpl
     @Override
     synchronized public void run() {
       try {
-        initBlockCache();
-        initMobFileCache();
-
         HDFSBlocksDistribution hdfsBlocksDistribution =
             new HDFSBlocksDistribution();
         HDFSBlocksDistribution hdfsBlocksDistributionSecondaryRegions =
@@ -945,12 +872,14 @@ class MetricsRegionServerWrapperImpl
         mobFlushedCellsSize = tempMobFlushedCellsSize;
         mobScanCellsCount = tempMobScanCellsCount;
         mobScanCellsSize = tempMobScanCellsSize;
-        mobFileCacheAccessCount = mobFileCache.getAccessCount();
-        mobFileCacheMissCount = mobFileCache.getMissCount();
-        mobFileCacheHitRatio = Double.
-            isNaN(mobFileCache.getHitRatio())?0:mobFileCache.getHitRatio();
-        mobFileCacheEvictedCount = mobFileCache.getEvictedFileCount();
-        mobFileCacheCount = mobFileCache.getCacheSize();
+        mobFileCacheAccessCount = mobFileCache.map(MobFileCache::getAccessCount).orElse(0L);
+        mobFileCacheMissCount = mobFileCache.map(MobFileCache::getMissCount).orElse(0L);
+        mobFileCacheHitRatio = mobFileCache.map(MobFileCache::getHitRatio).orElse(0.0);
+        if (Double.isNaN(mobFileCacheHitRatio)) {
+          mobFileCacheHitRatio = 0.0;
+        }
+        mobFileCacheEvictedCount = mobFileCache.map(MobFileCache::getEvictedFileCount).orElse(0L);
+        mobFileCacheCount = mobFileCache.map(MobFileCache::getCacheSize).orElse(0);
         blockedRequestsCount = tempBlockedRequestsCount;
       } catch (Throwable e) {
         LOG.warn("Caught exception! Will suppress and retry.", e);
@@ -980,161 +909,101 @@ class MetricsRegionServerWrapperImpl
 
   @Override
   public long getDataMissCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getDataMissCount();
+    return this.cacheStats.map(CacheStats::getDataMissCount).orElse(0L);
   }
 
   @Override
   public long getLeafIndexMissCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getLeafIndexMissCount();
+    return this.cacheStats.map(CacheStats::getLeafIndexMissCount).orElse(0L);
   }
 
   @Override
   public long getBloomChunkMissCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getBloomChunkMissCount();
+    return this.cacheStats.map(CacheStats::getBloomChunkMissCount).orElse(0L);
   }
 
   @Override
   public long getMetaMissCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getMetaMissCount();
+    return this.cacheStats.map(CacheStats::getMetaMissCount).orElse(0L);
   }
 
   @Override
   public long getRootIndexMissCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getRootIndexMissCount();
+    return this.cacheStats.map(CacheStats::getRootIndexMissCount).orElse(0L);
   }
 
   @Override
   public long getIntermediateIndexMissCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getIntermediateIndexMissCount();
+    return this.cacheStats.map(CacheStats::getIntermediateIndexMissCount).orElse(0L);
   }
 
   @Override
   public long getFileInfoMissCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getFileInfoMissCount();
+    return this.cacheStats.map(CacheStats::getFileInfoMissCount).orElse(0L);
   }
 
   @Override
   public long getGeneralBloomMetaMissCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getGeneralBloomMetaMissCount();
+    return this.cacheStats.map(CacheStats::getGeneralBloomMetaMissCount).orElse(0L);
   }
 
   @Override
   public long getDeleteFamilyBloomMissCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getDeleteFamilyBloomMissCount();
+    return this.cacheStats.map(CacheStats::getDeleteFamilyBloomMissCount).orElse(0L);
   }
 
   @Override
   public long getTrailerMissCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getTrailerMissCount();
+    return this.cacheStats.map(CacheStats::getTrailerMissCount).orElse(0L);
   }
 
   @Override
   public long getDataHitCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getDataHitCount();
+    return this.cacheStats.map(CacheStats::getDataHitCount).orElse(0L);
   }
 
   @Override
   public long getLeafIndexHitCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getLeafIndexHitCount();
+    return this.cacheStats.map(CacheStats::getLeafIndexHitCount).orElse(0L);
   }
 
   @Override
   public long getBloomChunkHitCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getBloomChunkHitCount();
+    return this.cacheStats.map(CacheStats::getBloomChunkHitCount).orElse(0L);
   }
 
   @Override
   public long getMetaHitCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getMetaHitCount();
+    return this.cacheStats.map(CacheStats::getMetaHitCount).orElse(0L);
   }
 
   @Override
   public long getRootIndexHitCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getRootIndexHitCount();
+    return this.cacheStats.map(CacheStats::getRootIndexHitCount).orElse(0L);
   }
 
   @Override
   public long getIntermediateIndexHitCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getIntermediateIndexHitCount();
+    return this.cacheStats.map(CacheStats::getIntermediateIndexHitCount).orElse(0L);
   }
 
   @Override
   public long getFileInfoHitCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getFileInfoHitCount();
+    return this.cacheStats.map(CacheStats::getFileInfoHitCount).orElse(0L);
   }
 
   @Override
   public long getGeneralBloomMetaHitCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getGeneralBloomMetaHitCount();
+    return this.cacheStats.map(CacheStats::getGeneralBloomMetaHitCount).orElse(0L);
   }
 
   @Override
   public long getDeleteFamilyBloomHitCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getDeleteFamilyBloomHitCount();
+    return this.cacheStats.map(CacheStats::getDeleteFamilyBloomHitCount).orElse(0L);
   }
 
   @Override
   public long getTrailerHitCount() {
-    if (this.cacheStats == null) {
-      return 0;
-    }
-    return cacheStats.getTrailerHitCount();
+    return this.cacheStats.map(CacheStats::getTrailerHitCount).orElse(0L);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index df84dcf..31df37a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -89,6 +89,7 @@ import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.PriorityFunction;
@@ -3683,7 +3684,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         stats.addException(region.getRegionInfo().getRegionName(), e);
       }
     }
-    stats.withMaxCacheSize(regionServer.getCacheConfig().getBlockCache().getMaxSize());
+    stats.withMaxCacheSize(regionServer.getBlockCache().map(BlockCache::getMaxSize).orElse(0L));
     return builder.setStats(ProtobufUtil.toCacheEvictionStats(stats.build())).build();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index 37a3606..e0638ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.Server;
@@ -31,7 +32,9 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.locking.EntityLock;
 import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionSizeStore;
@@ -266,4 +269,14 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo
    * @return Return table descriptors implementation.
    */
   TableDescriptors getTableDescriptors();
-}
+
+  /**
+   * @return The block cache instance.
+   */
+  Optional<BlockCache> getBlockCache();
+
+  /**
+   * @return The cache for mob files.
+   */
+  Optional<MobFileCache> getMobFileCache();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 31a7cad..0cd5a22 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@@ -104,6 +105,7 @@ import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
 import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
 import org.apache.hadoop.hbase.master.assignment.RegionStateStore;
 import org.apache.hadoop.hbase.master.assignment.RegionStates;
+import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.ChunkCreator;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -2512,6 +2514,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
     return new WALFactory(confForWAL, "hregion-" + RandomStringUtils.randomNumeric(8)).getWAL(hri);
   }
 
+
   /**
    * Create a region with it's own WAL. Be sure to call
    * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
@@ -2526,6 +2529,31 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
    * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
    */
   public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
+      final Configuration conf, final TableDescriptor htd, BlockCache blockCache)
+      throws IOException {
+    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
+    region.setBlockCache(blockCache);
+    region.initialize();
+    return region;
+  }
+  /**
+   * Create a region with it's own WAL. Be sure to call
+   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
+   */
+  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
+      final Configuration conf, final TableDescriptor htd, MobFileCache mobFileCache)
+      throws IOException {
+    HRegion region = createRegionAndWAL(info, rootDir, conf, htd, false);
+    region.setMobFileCache(mobFileCache);
+    region.initialize();
+    return region;
+  }
+
+  /**
+   * Create a region with it's own WAL. Be sure to call
+   * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} to clean up all resources.
+   */
+  public static HRegion createRegionAndWAL(final RegionInfo info, final Path rootDir,
       final Configuration conf, final TableDescriptor htd, boolean initialize)
       throws IOException {
     ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
@@ -4037,17 +4065,21 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
       + " on server " + server);
   }
 
-  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd)
-      throws IOException {
-    TableDescriptor td
-        = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
-            .setColumnFamily(cd)
-            .build();
-    HRegionInfo info =
-        new HRegionInfo(TableName.valueOf(tableName), null, null, false);
+  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd) throws IOException {
+    TableDescriptor td =
+        TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
+    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
     return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td);
   }
 
+  public HRegion createTestRegion(String tableName, ColumnFamilyDescriptor cd,
+      BlockCache blockCache) throws IOException {
+    TableDescriptor td =
+        TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamily(cd).build();
+    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf(tableName)).build();
+    return createRegionAndWAL(info, getDataTestDir(), getConfiguration(), td, blockCache);
+  }
+
   public void setFileSystemURI(String fsURI) {
     FS_URI = fsURI;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index ff0a88c..0e4f241 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
@@ -36,7 +37,9 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.locking.EntityLock;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
+import org.apache.hadoop.hbase.mob.MobFileCache;
 import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.quotas.RegionSizeStore;
@@ -355,4 +358,14 @@ public class MockRegionServerServices implements RegionServerServices {
   public TableDescriptors getTableDescriptors() {
     return null;
   }
+
+  @Override
+  public Optional<BlockCache> getBlockCache() {
+    return Optional.empty();
+  }
+
+  @Override
+  public Optional<MobFileCache> getMobFileCache() {
+    return Optional.empty();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
index d22772a..31c01c0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
@@ -142,7 +142,7 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
       CacheConfig cacheConf = store.getCacheConfig();
       cacheConf.setCacheDataOnWrite(true);
       cacheConf.setEvictOnClose(true);
-      final BlockCache cache = cacheConf.getBlockCache();
+      final BlockCache cache = cacheConf.getBlockCache().get();
       // insert data. 5 Rows are added
       Put put = new Put(ROW);
       put.addColumn(FAMILY, QUALIFIER, data);
@@ -306,7 +306,7 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
       CacheConfig cacheConf = store.getCacheConfig();
       cacheConf.setCacheDataOnWrite(true);
       cacheConf.setEvictOnClose(true);
-      final BlockCache cache = cacheConf.getBlockCache();
+      final BlockCache cache = cacheConf.getBlockCache().get();
       // insert data. 5 Rows are added
       Put put = new Put(ROW);
       put.addColumn(FAMILY, QUALIFIER, data);

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
index fc4c1f9..7f20195 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java
@@ -196,7 +196,7 @@ public class TestBlockEvictionFromClient {
       CacheConfig cacheConf = store.getCacheConfig();
       cacheConf.setCacheDataOnWrite(true);
       cacheConf.setEvictOnClose(true);
-      BlockCache cache = cacheConf.getBlockCache();
+      BlockCache cache = cacheConf.getBlockCache().get();
 
       // insert data. 2 Rows are added
       Put put = new Put(ROW);
@@ -286,7 +286,7 @@ public class TestBlockEvictionFromClient {
       CacheConfig cacheConf = store.getCacheConfig();
       cacheConf.setCacheDataOnWrite(true);
       cacheConf.setEvictOnClose(true);
-      BlockCache cache = cacheConf.getBlockCache();
+      BlockCache cache = cacheConf.getBlockCache().get();
 
       insertData(table);
       // flush the data
@@ -345,7 +345,7 @@ public class TestBlockEvictionFromClient {
       CacheConfig cacheConf = store.getCacheConfig();
       cacheConf.setCacheDataOnWrite(true);
       cacheConf.setEvictOnClose(true);
-      BlockCache cache = cacheConf.getBlockCache();
+      BlockCache cache = cacheConf.getBlockCache().get();
 
       Put put = new Put(ROW);
       put.addColumn(FAMILY, QUALIFIER, data);
@@ -583,7 +583,7 @@ public class TestBlockEvictionFromClient {
       HStore store = region.getStores().iterator().next();
       CacheConfig cacheConf = store.getCacheConfig();
       cacheConf.setEvictOnClose(true);
-      BlockCache cache = cacheConf.getBlockCache();
+      BlockCache cache = cacheConf.getBlockCache().get();
 
       Put put = new Put(ROW);
       put.addColumn(FAMILY, QUALIFIER, data);
@@ -647,7 +647,7 @@ public class TestBlockEvictionFromClient {
       CacheConfig cacheConf = store.getCacheConfig();
       cacheConf.setCacheDataOnWrite(true);
       cacheConf.setEvictOnClose(true);
-      BlockCache cache = cacheConf.getBlockCache();
+      BlockCache cache = cacheConf.getBlockCache().get();
 
       Put put = new Put(ROW);
       put.addColumn(FAMILY, QUALIFIER, data);
@@ -803,7 +803,7 @@ public class TestBlockEvictionFromClient {
       cacheConf.setCacheDataOnWrite(true);
       cacheConf.setEvictOnClose(true);
       // Use the last one
-      cache = cacheConf.getBlockCache();
+      cache = cacheConf.getBlockCache().get();
     }
     return cache;
   }
@@ -830,7 +830,7 @@ public class TestBlockEvictionFromClient {
       CacheConfig cacheConf = store.getCacheConfig();
       cacheConf.setCacheDataOnWrite(true);
       cacheConf.setEvictOnClose(true);
-      BlockCache cache = cacheConf.getBlockCache();
+      BlockCache cache = cacheConf.getBlockCache().get();
 
       // insert data. 2 Rows are added
       insertData(table);
@@ -896,7 +896,7 @@ public class TestBlockEvictionFromClient {
       CacheConfig cacheConf = store.getCacheConfig();
       cacheConf.setCacheDataOnWrite(true);
       cacheConf.setEvictOnClose(true);
-      BlockCache cache = cacheConf.getBlockCache();
+      BlockCache cache = cacheConf.getBlockCache().get();
 
       // insert data. 2 Rows are added
       Put put = new Put(ROW);
@@ -1014,7 +1014,7 @@ public class TestBlockEvictionFromClient {
       CacheConfig cacheConf = store.getCacheConfig();
       cacheConf.setCacheDataOnWrite(true);
       cacheConf.setEvictOnClose(true);
-      BlockCache cache = cacheConf.getBlockCache();
+      BlockCache cache = cacheConf.getBlockCache().get();
 
       // insert data. 2 Rows are added
       Put put = new Put(ROW);
@@ -1144,7 +1144,7 @@ public class TestBlockEvictionFromClient {
       CacheConfig cacheConf = store.getCacheConfig();
       cacheConf.setCacheDataOnWrite(true);
       cacheConf.setEvictOnClose(true);
-      BlockCache cache = cacheConf.getBlockCache();
+      BlockCache cache = cacheConf.getBlockCache().get();
       // insert data. 2 Rows are added
       insertData(table);
       // flush the data

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 65bc3f6..e5ffd73 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -5234,14 +5234,13 @@ public class TestFromClientSide {
       CacheConfig cacheConf = store.getCacheConfig();
       cacheConf.setCacheDataOnWrite(true);
       cacheConf.setEvictOnClose(true);
-      BlockCache cache = cacheConf.getBlockCache();
+      BlockCache cache = cacheConf.getBlockCache().get();
 
       // establish baseline stats
       long startBlockCount = cache.getBlockCount();
       long startBlockHits = cache.getStats().getHitCount();
       long startBlockMiss = cache.getStats().getMissCount();
 
-
       // wait till baseline is stable, (minimal 500 ms)
       for (int i = 0; i < 5; i++) {
         Thread.sleep(100);

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
index 11d7bb4..350a316 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/encoding/TestEncodedSeekers.java
@@ -28,15 +28,16 @@ import java.util.Map;
 import org.apache.hadoop.hbase.ArrayBackedTag;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
 import org.apache.hadoop.hbase.regionserver.BloomType;
@@ -112,17 +113,17 @@ public class TestEncodedSeekers {
     if(includeTags) {
       testUtil.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, 3);
     }
-    CacheConfig.instantiateBlockCache(testUtil.getConfiguration());
+
     LruBlockCache cache =
-      (LruBlockCache)new CacheConfig(testUtil.getConfiguration()).getBlockCache();
-    cache.clearCache();
+        (LruBlockCache) BlockCacheFactory.createBlockCache(testUtil.getConfiguration());
     // Need to disable default row bloom filter for this test to pass.
-    HColumnDescriptor hcd = (new HColumnDescriptor(CF_NAME)).setMaxVersions(MAX_VERSIONS).
-        setDataBlockEncoding(encoding).
-        setBlocksize(BLOCK_SIZE).
-        setBloomFilterType(BloomType.NONE).
-        setCompressTags(compressTags);
-    HRegion region = testUtil.createTestRegion(TABLE_NAME, hcd);
+    ColumnFamilyDescriptor cfd =
+        ColumnFamilyDescriptorBuilder.newBuilder(CF_BYTES).setMaxVersions(MAX_VERSIONS).
+            setDataBlockEncoding(encoding).
+            setBlocksize(BLOCK_SIZE).
+            setBloomFilterType(BloomType.NONE).
+            setCompressTags(compressTags).build();
+    HRegion region = testUtil.createTestRegion(TABLE_NAME, cfd, cache);
 
     //write the data, but leave some in the memstore
     doPuts(region);
@@ -145,7 +146,6 @@ public class TestEncodedSeekers {
     assertTrue(encodingCounts.get(encodingInCache) > 0);
   }
 
-
   private void doPuts(HRegion region) throws IOException{
     LoadTestKVGenerator dataGenerator = new LoadTestKVGenerator(MIN_VALUE_SIZE, MAX_VALUE_SIZE);
      for (int i = 0; i < NUM_ROWS; ++i) {
@@ -175,7 +175,6 @@ public class TestEncodedSeekers {
     }
   }
 
-
   private void doGets(Region region) throws IOException{
     for (int i = 0; i < NUM_ROWS; ++i) {
       final byte[] rowKey = LoadTestKVGenerator.md5PrefixedKey(i).getBytes();
@@ -195,5 +194,4 @@ public class TestEncodedSeekers {
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockCacheReporting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockCacheReporting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockCacheReporting.java
index 19919e0..1313f31 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockCacheReporting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockCacheReporting.java
@@ -17,12 +17,14 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Objects;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -31,7 +33,6 @@ import org.apache.hadoop.hbase.io.hfile.TestCacheConfig.DataCacheEntry;
 import org.apache.hadoop.hbase.io.hfile.TestCacheConfig.IndexCacheEntry;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -51,16 +52,9 @@ public class TestBlockCacheReporting {
 
   @Before
   public void setUp() throws Exception {
-    CacheConfig.clearGlobalInstances();
     this.conf = HBaseConfiguration.create();
   }
 
-  @After
-  public void tearDown() throws Exception {
-    // Let go of current block cache.
-    CacheConfig.clearGlobalInstances();
-  }
-
   private void addDataAndHits(final BlockCache bc, final int count) {
     Cacheable dce = new DataCacheEntry();
     Cacheable ice = new IndexCacheEntry();
@@ -85,39 +79,36 @@ public class TestBlockCacheReporting {
   public void testBucketCache() throws IOException {
     this.conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
     this.conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 100);
-    CacheConfig.instantiateBlockCache(this.conf);
-    CacheConfig cc = new CacheConfig(this.conf);
-    assertTrue(cc.getBlockCache() instanceof CombinedBlockCache);
-    logPerBlock(cc.getBlockCache());
+    BlockCache blockCache = BlockCacheFactory.createBlockCache(this.conf);
+    assertTrue(blockCache instanceof CombinedBlockCache);
+    logPerBlock(blockCache);
     final int count = 3;
-    addDataAndHits(cc.getBlockCache(), count);
+    addDataAndHits(blockCache, count);
     // The below has no asserts.  It is just exercising toString and toJSON code.
-    LOG.info(Objects.toString(cc.getBlockCache().getStats()));
-    BlockCacheUtil.CachedBlocksByFile cbsbf = logPerBlock(cc.getBlockCache());
+    LOG.info(Objects.toString(blockCache.getStats()));
+    BlockCacheUtil.CachedBlocksByFile cbsbf = logPerBlock(blockCache);
     LOG.info(Objects.toString(cbsbf));
     logPerFile(cbsbf);
-    bucketCacheReport(cc.getBlockCache());
+    bucketCacheReport(blockCache);
     LOG.info(BlockCacheUtil.toJSON(cbsbf));
   }
 
   @Test
   public void testLruBlockCache() throws IOException {
-    CacheConfig.instantiateBlockCache(this.conf);
     CacheConfig cc = new CacheConfig(this.conf);
-    assertTrue(cc.isBlockCacheEnabled());
     assertTrue(CacheConfig.DEFAULT_IN_MEMORY == cc.isInMemory());
-    assertTrue(cc.getBlockCache() instanceof LruBlockCache);
-    logPerBlock(cc.getBlockCache());
-    addDataAndHits(cc.getBlockCache(), 3);
+    BlockCache blockCache = BlockCacheFactory.createBlockCache(this.conf);
+    assertTrue(blockCache instanceof LruBlockCache);
+    logPerBlock(blockCache);
+    addDataAndHits(blockCache, 3);
     // The below has no asserts.  It is just exercising toString and toJSON code.
-    BlockCache bc = cc.getBlockCache();
-    LOG.info("count=" + bc.getBlockCount() + ", currentSize=" + bc.getCurrentSize() +
-        ", freeSize=" + bc.getFreeSize() );
-    LOG.info(Objects.toString(cc.getBlockCache().getStats()));
-    BlockCacheUtil.CachedBlocksByFile cbsbf = logPerBlock(cc.getBlockCache());
+    LOG.info("count=" + blockCache.getBlockCount() + ", currentSize=" + blockCache.getCurrentSize()
+        + ", freeSize=" + blockCache.getFreeSize());
+    LOG.info(Objects.toString(blockCache.getStats()));
+    BlockCacheUtil.CachedBlocksByFile cbsbf = logPerBlock(blockCache);
     LOG.info(Objects.toString(cbsbf));
     logPerFile(cbsbf);
-    bucketCacheReport(cc.getBlockCache());
+    bucketCacheReport(blockCache);
     LOG.info(BlockCacheUtil.toJSON(cbsbf));
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
index 7b6bbb3..4c56fff 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Threads;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -165,33 +164,25 @@ public class TestCacheConfig {
 
   @Before
   public void setUp() throws Exception {
-    CacheConfig.clearGlobalInstances();
     this.conf = HBaseConfiguration.create();
   }
 
-  @After
-  public void tearDown() throws Exception {
-    // Let go of current block cache.
-    CacheConfig.clearGlobalInstances();
-  }
-
   /**
-   * @param cc
+   * @param bc The block cache instance.
+   * @param cc Cache config.
    * @param doubling If true, addition of element ups counter by 2, not 1, because element added
    * to onheap and offheap caches.
    * @param sizing True if we should run sizing test (doesn't always apply).
    */
-  void basicBlockCacheOps(final CacheConfig cc, final boolean doubling,
+  void basicBlockCacheOps(final BlockCache bc, final CacheConfig cc, final boolean doubling,
       final boolean sizing) {
-    assertTrue(cc.isBlockCacheEnabled());
     assertTrue(CacheConfig.DEFAULT_IN_MEMORY == cc.isInMemory());
-    BlockCache bc = cc.getBlockCache();
     BlockCacheKey bck = new BlockCacheKey("f", 0);
     Cacheable c = new DataCacheEntry();
     // Do asserts on block counting.
     long initialBlockCount = bc.getBlockCount();
     bc.cacheBlock(bck, c, cc.isInMemory());
-    assertEquals(doubling? 2: 1, bc.getBlockCount() - initialBlockCount);
+    assertEquals(doubling ? 2 : 1, bc.getBlockCount() - initialBlockCount);
     bc.evictBlock(bck);
     assertEquals(initialBlockCount, bc.getBlockCount());
     // Do size accounting.  Do it after the above 'warm-up' because it looks like some
@@ -209,7 +200,6 @@ public class TestCacheConfig {
   @Test
   public void testDisableCacheDataBlock() throws IOException {
     Configuration conf = HBaseConfiguration.create();
-    CacheConfig.instantiateBlockCache(conf);
     CacheConfig cacheConfig = new CacheConfig(conf);
     assertTrue(cacheConfig.shouldCacheBlockOnRead(BlockCategory.DATA));
     assertFalse(cacheConfig.shouldCacheCompressed(BlockCategory.DATA));
@@ -260,7 +250,7 @@ public class TestCacheConfig {
     HColumnDescriptor family = new HColumnDescriptor("testDisableCacheDataBlock");
     family.setBlockCacheEnabled(false);
 
-    cacheConfig = new CacheConfig(conf, family);
+    cacheConfig = new CacheConfig(conf, family, null);
     assertFalse(cacheConfig.shouldCacheBlockOnRead(BlockCategory.DATA));
     assertFalse(cacheConfig.shouldCacheCompressed(BlockCategory.DATA));
     assertFalse(cacheConfig.shouldCacheDataCompressed());
@@ -275,12 +265,11 @@ public class TestCacheConfig {
 
   @Test
   public void testCacheConfigDefaultLRUBlockCache() {
-    CacheConfig.instantiateBlockCache(this.conf);
     CacheConfig cc = new CacheConfig(this.conf);
-    assertTrue(cc.isBlockCacheEnabled());
     assertTrue(CacheConfig.DEFAULT_IN_MEMORY == cc.isInMemory());
-    basicBlockCacheOps(cc, false, true);
-    assertTrue(cc.getBlockCache() instanceof LruBlockCache);
+    BlockCache blockCache = BlockCacheFactory.createBlockCache(this.conf);
+    basicBlockCacheOps(blockCache, cc, false, true);
+    assertTrue(blockCache instanceof LruBlockCache);
   }
 
   /**
@@ -309,18 +298,18 @@ public class TestCacheConfig {
   private void doBucketCacheConfigTest() {
     final int bcSize = 100;
     this.conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, bcSize);
-    CacheConfig.instantiateBlockCache(this.conf);
     CacheConfig cc = new CacheConfig(this.conf);
-    basicBlockCacheOps(cc, false, false);
-    assertTrue(cc.getBlockCache() instanceof CombinedBlockCache);
+    BlockCache blockCache = BlockCacheFactory.createBlockCache(this.conf);
+    basicBlockCacheOps(blockCache, cc, false, false);
+    assertTrue(blockCache instanceof CombinedBlockCache);
     // TODO: Assert sizes allocated are right and proportions.
-    CombinedBlockCache cbc = (CombinedBlockCache)cc.getBlockCache();
-    BlockCache [] bcs = cbc.getBlockCaches();
+    CombinedBlockCache cbc = (CombinedBlockCache) blockCache;
+    BlockCache[] bcs = cbc.getBlockCaches();
     assertTrue(bcs[0] instanceof LruBlockCache);
-    LruBlockCache lbc = (LruBlockCache)bcs[0];
+    LruBlockCache lbc = (LruBlockCache) bcs[0];
     assertEquals(MemorySizeUtil.getOnHeapCacheSize(this.conf), lbc.getMaxSize());
     assertTrue(bcs[1] instanceof BucketCache);
-    BucketCache bc = (BucketCache)bcs[1];
+    BucketCache bc = (BucketCache) bcs[1];
     // getMaxSize comes back in bytes but we specified size in MB
     assertEquals(bcSize, bc.getMaxSize() / (1024 * 1024));
   }
@@ -341,12 +330,12 @@ public class TestCacheConfig {
     long bcExpectedSize = 100 * 1024 * 1024; // MB.
     assertTrue(lruExpectedSize < bcExpectedSize);
     this.conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, bcSize);
-    CacheConfig.instantiateBlockCache(this.conf);
     CacheConfig cc = new CacheConfig(this.conf);
-    basicBlockCacheOps(cc, false, false);
-    assertTrue(cc.getBlockCache() instanceof CombinedBlockCache);
+    BlockCache blockCache = BlockCacheFactory.createBlockCache(this.conf);
+    basicBlockCacheOps(blockCache, cc, false, false);
+    assertTrue(blockCache instanceof CombinedBlockCache);
     // TODO: Assert sizes allocated are right and proportions.
-    CombinedBlockCache cbc = (CombinedBlockCache)cc.getBlockCache();
+    CombinedBlockCache cbc = (CombinedBlockCache) blockCache;
     LruBlockCache lbc = cbc.onHeapCache;
     assertEquals(lruExpectedSize, lbc.getMaxSize());
     BlockCache bc = cbc.l2Cache;
@@ -382,10 +371,10 @@ public class TestCacheConfig {
   public void testL2CacheWithInvalidBucketSize() {
     Configuration c = new Configuration(this.conf);
     c.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
-    c.set(CacheConfig.BUCKET_CACHE_BUCKETS_KEY, "256,512,1024,2048,4000,4096");
+    c.set(BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY, "256,512,1024,2048,4000,4096");
     c.setFloat(HConstants.BUCKET_CACHE_SIZE_KEY, 1024);
     try {
-      CacheConfig.getBucketCache(c);
+      BlockCacheFactory.createBlockCache(c);
       fail("Should throw IllegalArgumentException when passing illegal value for bucket size");
     } catch (IllegalArgumentException e) {
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
index 9c2f6df..4163d55 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
@@ -31,6 +31,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,10 +40,11 @@ import org.apache.hadoop.hbase.CellComparatorImpl;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.fs.HFileSystem;
@@ -51,7 +53,6 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -160,8 +161,7 @@ public class TestCacheOnWrite {
     Configuration conf = TEST_UTIL.getConfiguration();
     List<BlockCache> blockcaches = new ArrayList<>();
     // default
-    CacheConfig.instantiateBlockCache(conf);
-    blockcaches.add(new CacheConfig(conf).getBlockCache());
+    blockcaches.add(BlockCacheFactory.createBlockCache(conf));
 
     //set LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME to 2.0f due to HBASE-16287
     TEST_UTIL.getConfiguration().setFloat(LruBlockCache.LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME, 2.0f);
@@ -224,16 +224,16 @@ public class TestCacheOnWrite {
     conf = TEST_UTIL.getConfiguration();
     this.conf.set("dfs.datanode.data.dir.perm", "700");
     conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_BLOCK_SIZE);
-    conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
-        BLOOM_BLOCK_SIZE);
+    conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE);
     conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData);
     cowType.modifyConf(conf);
+    conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.DATA));
+    conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
+        cowType.shouldBeCached(BlockType.LEAF_INDEX));
+    conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
+        cowType.shouldBeCached(BlockType.BLOOM_CHUNK));
+    cacheConf = new CacheConfig(conf, blockCache);
     fs = HFileSystem.get(conf);
-    cacheConf =
-        new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA),
-        cowType.shouldBeCached(BlockType.LEAF_INDEX),
-        cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData,
-            false, false);
   }
 
   @After
@@ -414,13 +414,11 @@ public class TestCacheOnWrite {
     final String cf = "myCF";
     final byte[] cfBytes = Bytes.toBytes(cf);
     final int maxVersions = 3;
-    HRegion region = TEST_UTIL.createTestRegion(table,
-        new HColumnDescriptor(cf)
-            .setCompressionType(compress)
-            .setBloomFilterType(BLOOM_TYPE)
-            .setMaxVersions(maxVersions)
-            .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding())
-    );
+    ColumnFamilyDescriptor cfd =
+        ColumnFamilyDescriptorBuilder.newBuilder(cfBytes).setCompressionType(compress)
+            .setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions)
+            .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()).build();
+    HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache);
     int rowIdx = 0;
     long ts = EnvironmentEdgeManager.currentTime();
     for (int iFile = 0; iFile < 5; ++iFile) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
index 5612c1b..fd39f48 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
@@ -25,14 +25,14 @@ import java.util.Arrays;
 import java.util.Collection;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -103,24 +103,20 @@ public class TestForceCacheImportantBlocks {
 
   @Before
   public void setup() {
-    // Make sure we make a new one each time.
-    CacheConfig.clearGlobalInstances();
     HFile.DATABLOCK_READ_COUNT.reset();
-    CacheConfig.instantiateBlockCache(TEST_UTIL.getConfiguration());
   }
 
   @Test
   public void testCacheBlocks() throws IOException {
     // Set index block size to be the same as normal block size.
     TEST_UTIL.getConfiguration().setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, BLOCK_SIZE);
-    HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(CF)).setMaxVersions(MAX_VERSIONS).
-      setCompressionType(COMPRESSION_ALGORITHM).
-      setBloomFilterType(BLOOM_TYPE);
-    hcd.setBlocksize(BLOCK_SIZE);
-    hcd.setBlockCacheEnabled(cfCacheEnabled);
-    HRegion region = TEST_UTIL.createTestRegion(TABLE, hcd);
-    BlockCache cache = region.getStore(hcd.getName()).getCacheConfig().getBlockCache();
-    CacheStats stats = cache.getStats();
+    BlockCache blockCache = BlockCacheFactory.createBlockCache(TEST_UTIL.getConfiguration());
+    ColumnFamilyDescriptor cfd =
+        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(CF)).setMaxVersions(MAX_VERSIONS)
+            .setCompressionType(COMPRESSION_ALGORITHM).setBloomFilterType(BLOOM_TYPE)
+            .setBlocksize(BLOCK_SIZE).setBlockCacheEnabled(cfCacheEnabled).build();
+    HRegion region = TEST_UTIL.createTestRegion(TABLE, cfd, blockCache);
+    CacheStats stats = blockCache.getStats();
     writeTestData(region);
     assertEquals(0, stats.getHitCount());
     assertEquals(0, HFile.DATABLOCK_READ_COUNT.sum());

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
index 7053fce..2a613de 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
@@ -86,13 +86,14 @@ public class TestHFile  {
     TEST_UTIL.getDataTestDir("TestHFile").toString();
   private final int minBlockSize = 512;
   private static String localFormatter = "%010d";
-  private static CacheConfig cacheConf = null;
+  private static CacheConfig cacheConf;
   private static Configuration conf ;
   private static FileSystem fs;
 
   @BeforeClass
   public static void setUp() throws Exception {
     conf = TEST_UTIL.getConfiguration();
+    cacheConf = new CacheConfig(conf);
     fs = TEST_UTIL.getTestFileSystem();
   }
 
@@ -162,7 +163,6 @@ public class TestHFile  {
    */
   @Test
   public void testEmptyHFile() throws IOException {
-    if (cacheConf == null) cacheConf = new CacheConfig(conf);
     Path f = new Path(ROOT_DIR, testName.getMethodName());
     HFileContext context = new HFileContextBuilder().withIncludesTags(false).build();
     Writer w =
@@ -179,7 +179,6 @@ public class TestHFile  {
    */
   @Test
   public void testCorrupt0LengthHFile() throws IOException {
-    if (cacheConf == null) cacheConf = new CacheConfig(conf);
     Path f = new Path(ROOT_DIR, testName.getMethodName());
     FSDataOutputStream fsos = fs.create(f);
     fsos.close();
@@ -213,7 +212,6 @@ public class TestHFile  {
    */
   @Test
   public void testCorruptTruncatedHFile() throws IOException {
-    if (cacheConf == null) cacheConf = new CacheConfig(conf);
     Path f = new Path(ROOT_DIR, testName.getMethodName());
     HFileContext  context = new HFileContextBuilder().build();
     Writer w = HFile.getWriterFactory(conf, cacheConf).withPath(this.fs, f)
@@ -315,7 +313,6 @@ public class TestHFile  {
     if (useTags) {
       conf.setInt("hfile.format.version", 3);
     }
-    if (cacheConf == null) cacheConf = new CacheConfig(conf);
     Path  ncHFile = new Path(ROOT_DIR, "basic.hfile." + codec.toString() + useTags);
     FSDataOutputStream fout = createFSOutput(ncHFile);
     HFileContext meta = new HFileContextBuilder()
@@ -411,7 +408,6 @@ public class TestHFile  {
   }
 
   private void metablocks(final String compress) throws Exception {
-    if (cacheConf == null) cacheConf = new CacheConfig(conf);
     Path mFile = new Path(ROOT_DIR, "meta.hfile");
     FSDataOutputStream fout = createFSOutput(mFile);
     HFileContext meta = new HFileContextBuilder()
@@ -445,7 +441,6 @@ public class TestHFile  {
 
   @Test
   public void testNullMetaBlocks() throws Exception {
-    if (cacheConf == null) cacheConf = new CacheConfig(conf);
     for (Compression.Algorithm compressAlgo :
         HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) {
       Path mFile = new Path(ROOT_DIR, "nometa_" + compressAlgo + ".hfile");

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index a588341..48080b2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -32,6 +32,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -246,7 +247,7 @@ public class TestHFileBlock {
   @Test
   public void testNoCompression() throws IOException {
     CacheConfig cacheConf = Mockito.mock(CacheConfig.class);
-    Mockito.when(cacheConf.isBlockCacheEnabled()).thenReturn(false);
+    Mockito.when(cacheConf.getBlockCache()).thenReturn(Optional.empty());
 
     HFileBlock block =
       createTestV2Block(NONE, includesMemstoreTS, false).getBlockForCaching(cacheConf);

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
index efe76aa..78f8584 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
@@ -530,9 +530,8 @@ public class TestHFileBlockIndex {
     conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, maxChunkSize);
     // should open hfile.block.index.cacheonwrite
     conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, true);
-    CacheConfig.instantiateBlockCache(conf);
-    CacheConfig cacheConf = new CacheConfig(conf);
-    BlockCache blockCache = cacheConf.getBlockCache();
+    CacheConfig cacheConf = new CacheConfig(conf, BlockCacheFactory.createBlockCache(conf));
+    BlockCache blockCache = cacheConf.getBlockCache().get();
     // Evict all blocks that were cached-on-write by the previous invocation.
     blockCache.evictBlocksByHfileName(hfilePath.getName());
     // Write the HFile
@@ -589,9 +588,8 @@ public class TestHFileBlockIndex {
   public void testHFileWriterAndReader() throws IOException {
     Path hfilePath = new Path(TEST_UTIL.getDataTestDir(),
         "hfile_for_block_index");
-    CacheConfig.instantiateBlockCache(conf);
-    CacheConfig cacheConf = new CacheConfig(conf);
-    BlockCache blockCache = cacheConf.getBlockCache();
+    CacheConfig cacheConf = new CacheConfig(conf, BlockCacheFactory.createBlockCache(conf));
+    BlockCache blockCache = cacheConf.getBlockCache().get();
 
     for (int testI = 0; testI < INDEX_CHUNK_SIZES.length; ++testI) {
       int indexBlockSize = INDEX_CHUNK_SIZES[testI];

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
index 4542a3c..5935f91 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
@@ -82,13 +82,11 @@ public class TestLazyDataBlockDecompression {
 
   @Before
   public void setUp() throws IOException {
-    CacheConfig.clearGlobalInstances();
     fs = FileSystem.get(TEST_UTIL.getConfiguration());
   }
 
   @After
   public void tearDown() {
-    CacheConfig.clearGlobalInstances();
     fs = null;
   }
 
@@ -159,12 +157,11 @@ public class TestLazyDataBlockDecompression {
     lazyCompressDisabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
     lazyCompressDisabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
     lazyCompressDisabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, false);
-    CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE =
-      new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled);
-    CacheConfig cc = new CacheConfig(lazyCompressDisabled);
+    CacheConfig cc = new CacheConfig(lazyCompressDisabled,
+        new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled));
     assertFalse(cc.shouldCacheDataCompressed());
-    assertTrue(cc.getBlockCache() instanceof LruBlockCache);
-    LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache();
+    assertTrue(cc.getBlockCache().get() instanceof LruBlockCache);
+    LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache().get();
     LOG.info("disabledBlockCache=" + disabledBlockCache);
     assertEquals("test inconsistency detected.", maxSize, disabledBlockCache.getMaxSize());
     assertTrue("eviction thread spawned unintentionally.",
@@ -194,12 +191,11 @@ public class TestLazyDataBlockDecompression {
     lazyCompressEnabled.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
     lazyCompressEnabled.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cacheOnWrite);
     lazyCompressEnabled.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, true);
-    CacheConfig.GLOBAL_BLOCK_CACHE_INSTANCE =
-      new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressEnabled);
-    cc = new CacheConfig(lazyCompressEnabled);
+    cc = new CacheConfig(lazyCompressEnabled,
+        new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressEnabled));
     assertTrue("test improperly configured.", cc.shouldCacheDataCompressed());
-    assertTrue(cc.getBlockCache() instanceof LruBlockCache);
-    LruBlockCache enabledBlockCache = (LruBlockCache) cc.getBlockCache();
+    assertTrue(cc.getBlockCache().get() instanceof LruBlockCache);
+    LruBlockCache enabledBlockCache = (LruBlockCache) cc.getBlockCache().get();
     LOG.info("enabledBlockCache=" + enabledBlockCache);
     assertEquals("test inconsistency detected", maxSize, enabledBlockCache.getMaxSize());
     assertTrue("eviction thread spawned unintentionally.",

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
index 811df14..9986bba 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
@@ -17,10 +17,12 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Random;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -28,8 +30,9 @@ import org.apache.hadoop.hbase.CellComparatorImpl;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.testclassification.IOTests;
@@ -57,24 +60,25 @@ public class TestPrefetch {
   private Configuration conf;
   private CacheConfig cacheConf;
   private FileSystem fs;
+  private BlockCache blockCache;
 
   @Before
   public void setUp() throws IOException {
     conf = TEST_UTIL.getConfiguration();
     conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
     fs = HFileSystem.get(conf);
-    CacheConfig.blockCacheDisabled = false;
-    CacheConfig.instantiateBlockCache(conf);
-    cacheConf = new CacheConfig(conf);
+    blockCache = BlockCacheFactory.createBlockCache(conf);
+    cacheConf = new CacheConfig(conf, blockCache);
   }
 
   @Test
   public void testPrefetchSetInHCDWorks() {
-    HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("f"));
-    hcd.setPrefetchBlocksOnOpen(true);
+    ColumnFamilyDescriptor columnFamilyDescriptor =
+        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true)
+            .build();
     Configuration c = HBaseConfiguration.create();
     assertFalse(c.getBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, false));
-    CacheConfig cc = new CacheConfig(c, hcd);
+    CacheConfig cc = new CacheConfig(c, columnFamilyDescriptor, blockCache);
     assertTrue(cc.shouldPrefetchOnOpen());
   }
 
@@ -119,7 +123,7 @@ public class TestPrefetch {
     }
 
     // Check that all of the data blocks were preloaded
-    BlockCache blockCache = cacheConf.getBlockCache();
+    BlockCache blockCache = cacheConf.getBlockCache().get();
     long offset = 0;
     while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
       HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, null, null);

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
index 18e8e70..ad6a0ab 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerFromBucketCache.java
@@ -25,29 +25,28 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ByteBufferKeyValue;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.ChunkCreator;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
-import org.apache.hadoop.hbase.wal.WAL;
 import org.junit.After;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -88,7 +87,6 @@ public class TestScannerFromBucketCache {
       conf.setFloat("hbase.regionserver.global.memstore.size", 0.1f);
     }
     tableName = TableName.valueOf(name.getMethodName());
-    CacheConfig.instantiateBlockCache(conf);
   }
 
   @After
@@ -96,7 +94,6 @@ public class TestScannerFromBucketCache {
     EnvironmentEdgeManagerTestHelper.reset();
     LOG.info("Cleaning test directory: " + test_util.getDataTestDir());
     test_util.cleanupTestDir();
-    CacheConfig.clearGlobalInstances();
   }
 
   String getName() {
@@ -210,9 +207,7 @@ public class TestScannerFromBucketCache {
       Thread.sleep(500);
       // do the scan again and verify. This time it should be from the bucket cache in offheap mode
       // but one of the cell will be copied due to the asSubByteBuff call
-      Scan scan = new Scan(row1);
-      scan.addFamily(fam1);
-      scan.setMaxVersions(10);
+      Scan scan = new Scan().withStartRow(row1).addFamily(fam1).readVersions(10);
       actual = new ArrayList<>();
       InternalScanner scanner = region.getScanner(scan);
 
@@ -290,9 +285,7 @@ public class TestScannerFromBucketCache {
   }
 
   private List<Cell> performScan(byte[] row1, byte[] fam1) throws IOException {
-    Scan scan = new Scan(row1);
-    scan.addFamily(fam1);
-    scan.setMaxVersions(MAX_VERSIONS);
+    Scan scan = new Scan().withStartRow(row1).addFamily(fam1).readVersions(MAX_VERSIONS);
     List<Cell> actual = new ArrayList<>();
     InternalScanner scanner = region.getScanner(scan);
 
@@ -307,32 +300,19 @@ public class TestScannerFromBucketCache {
   }
 
   private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
-      String callingMethod, Configuration conf, HBaseTestingUtility test_util, boolean isReadOnly,
+      String callingMethod, Configuration conf, HBaseTestingUtility testUtil, boolean isReadOnly,
       byte[]... families) throws IOException {
-    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
-    Path logDir = test_util.getDataTestDirOnTestFS(callingMethod + ".log");
-    HRegionInfo hri = new HRegionInfo(tableName, startKey, stopKey);
-    final WAL wal = HBaseTestingUtility.createWal(conf, logDir, hri);
-    return initHRegion(tableName, startKey, stopKey, callingMethod, conf, test_util, isReadOnly,
-      Durability.SYNC_WAL, wal, families);
-  }
-
-  /**
-   * @param tableName
-   * @param startKey
-   * @param stopKey
-   * @param callingMethod
-   * @param conf
-   * @param isReadOnly
-   * @param families
-   * @throws IOException
-   * @return A region on which you must call {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)}
-   *         when done.
-   */
-  private static HRegion initHRegion(TableName tableName, byte[] startKey, byte[] stopKey,
-      String callingMethod, Configuration conf, HBaseTestingUtility test_util, boolean isReadOnly,
-      Durability durability, WAL wal, byte[]... families) throws IOException {
-    return test_util.createLocalHRegion(tableName, startKey, stopKey, isReadOnly, durability, wal,
-      families);
+    RegionInfo regionInfo =
+        RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).setEndKey(stopKey).build();
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+    builder.setReadOnly(isReadOnly).setDurability(Durability.SYNC_WAL);
+    for (byte[] family : families) {
+      builder.setColumnFamily(
+          ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(Integer.MAX_VALUE)
+              .build());
+    }
+    return HBaseTestingUtility
+        .createRegionAndWAL(regionInfo, testUtil.getDataTestDir(callingMethod), conf,
+            builder.build(), BlockCacheFactory.createBlockCache(conf));
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1971d02e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingKeyRange.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingKeyRange.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingKeyRange.java
index d27b041..ed440e7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingKeyRange.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingKeyRange.java
@@ -123,10 +123,7 @@ public class TestScannerSelectionUsingKeyRange {
     }
 
     Scan scan = new Scan(Bytes.toBytes("aaa"), Bytes.toBytes("aaz"));
-    CacheConfig.blockCacheDisabled = false;
-    CacheConfig.instantiateBlockCache(conf);
-    CacheConfig cacheConf = new CacheConfig(conf);
-    LruBlockCache cache = (LruBlockCache) cacheConf.getBlockCache();
+    LruBlockCache cache = (LruBlockCache) BlockCacheFactory.createBlockCache(conf);
     cache.clearCache();
     InternalScanner scanner = region.getScanner(scan);
     List<Cell> results = new ArrayList<>();