You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2023/11/17 10:46:50 UTC

(hbase) branch HBASE-27389-rebase updated (9e74cc0d655 -> 254a3222359)

This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a change to branch HBASE-27389-rebase
in repository https://gitbox.apache.org/repos/asf/hbase.git


    omit 9e74cc0d655 HBASE-27998 Enhance region metrics to include prefetch ratio for each… (#5342)
     add e7f260780e3 HBASE-27998 Enhance region metrics to include prefetch ratio for each… (#5342)
     new 254a3222359 HBASE-27999 Implement cache prefetch aware load balancer (#5376)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (9e74cc0d655)
            \
             N -- N -- N   refs/heads/HBASE-27389-rebase (254a3222359)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../master/balancer/BalancerClusterState.java      | 156 ++++++-
 .../hbase/master/balancer/BalancerRegionLoad.java  |  12 +
 .../hbase/master/balancer/BaseLoadBalancer.java    |   3 +-
 .../master/balancer/CacheAwareLoadBalancer.java    | 479 +++++++++++++++++++++
 .../master/balancer/StochasticLoadBalancer.java    |  54 ++-
 .../hbase/master/balancer/BalancerTestBase.java    |  14 +
 .../balancer/TestStochasticLoadBalancer.java       |   4 +
 .../java/org/apache/hadoop/hbase/HConstants.java   |  12 +
 .../hadoop/hbase/io/hfile/BlockCacheFactory.java   |  13 +-
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  |   2 +
 .../balancer/TestCacheAwareLoadBalancer.java       | 397 +++++++++++++++++
 .../TestCacheAwareLoadBalancerCostFunctions.java   | 316 ++++++++++++++
 ...lancerWithStochasticLoadBalancerAsInternal.java |   2 +
 13 files changed, 1429 insertions(+), 35 deletions(-)
 create mode 100644 hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
 create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
 create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancerCostFunctions.java


(hbase) 01/01: HBASE-27999 Implement cache prefetch aware load balancer (#5376)

Posted by wc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch HBASE-27389-rebase
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 254a3222359ee5c1727edce403f3c2da3fb0784f
Author: Rahul Agarkar <ra...@cloudera.com>
AuthorDate: Tue Sep 19 20:30:01 2023 +0530

    HBASE-27999 Implement cache prefetch aware load balancer (#5376)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../master/balancer/BalancerClusterState.java      | 156 ++++++-
 .../hbase/master/balancer/BalancerRegionLoad.java  |  12 +
 .../hbase/master/balancer/BaseLoadBalancer.java    |   3 +-
 .../master/balancer/CacheAwareLoadBalancer.java    | 479 +++++++++++++++++++++
 .../master/balancer/StochasticLoadBalancer.java    |  54 ++-
 .../hbase/master/balancer/BalancerTestBase.java    |  14 +
 .../balancer/TestStochasticLoadBalancer.java       |   4 +
 .../java/org/apache/hadoop/hbase/HConstants.java   |  12 +
 .../hadoop/hbase/io/hfile/BlockCacheFactory.java   |  13 +-
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  |   2 +
 .../balancer/TestCacheAwareLoadBalancer.java       | 397 +++++++++++++++++
 .../TestCacheAwareLoadBalancerCostFunctions.java   | 316 ++++++++++++++
 ...lancerWithStochasticLoadBalancerAsInternal.java |   2 +
 13 files changed, 1429 insertions(+), 35 deletions(-)

diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
index a7ae8b4d1a5..4b3809c107c 100644
--- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
+++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.master.RackManager;
 import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,6 +115,12 @@ class BalancerClusterState {
   private float[][] rackLocalities;
   // Maps localityType -> region -> [server|rack]Index with highest locality
   private int[][] regionsToMostLocalEntities;
+  // Maps region -> serverIndex -> regionCacheRatio of a region on a server
+  private Map<Pair<Integer, Integer>, Float> regionIndexServerIndexRegionCachedRatio;
+  // Maps regionIndex -> serverIndex with best region cache ratio
+  private int[] regionServerIndexWithBestRegionCachedRatio;
+  // Maps regionName -> oldServerName -> cache ratio of the region on the old server
+  Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap;
 
   static class DefaultRackManager extends RackManager {
     @Override
@@ -125,13 +132,20 @@ class BalancerClusterState {
   BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
     Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
     RackManager rackManager) {
-    this(null, clusterState, loads, regionFinder, rackManager);
+    this(null, clusterState, loads, regionFinder, rackManager, null);
+  }
+
+  protected BalancerClusterState(Map<ServerName, List<RegionInfo>> clusterState,
+    Map<String, Deque<BalancerRegionLoad>> loads, RegionHDFSBlockLocationFinder regionFinder,
+    RackManager rackManager, Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
+    this(null, clusterState, loads, regionFinder, rackManager, oldRegionServerRegionCacheRatio);
   }
 
   @SuppressWarnings("unchecked")
   BalancerClusterState(Collection<RegionInfo> unassignedRegions,
     Map<ServerName, List<RegionInfo>> clusterState, Map<String, Deque<BalancerRegionLoad>> loads,
-    RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager) {
+    RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager,
+    Map<String, Pair<ServerName, Float>> oldRegionServerRegionCacheRatio) {
     if (unassignedRegions == null) {
       unassignedRegions = Collections.emptyList();
     }
@@ -145,6 +159,8 @@ class BalancerClusterState {
     tables = new ArrayList<>();
     this.rackManager = rackManager != null ? rackManager : new DefaultRackManager();
 
+    this.regionCacheRatioOnOldServerMap = oldRegionServerRegionCacheRatio;
+
     numRegions = 0;
 
     List<List<Integer>> serversPerHostList = new ArrayList<>();
@@ -541,6 +557,142 @@ class BalancerClusterState {
 
   }
 
+  /**
+   * Returns the size of hFiles from the most recent RegionLoad for region
+   */
+  public int getTotalRegionHFileSizeMB(int region) {
+    Deque<BalancerRegionLoad> load = regionLoads[region];
+    if (load == null) {
+      // This means, that the region has no actual data on disk
+      return 0;
+    }
+    return regionLoads[region].getLast().getRegionSizeMB();
+  }
+
+  /**
+   * Returns the weighted cache ratio of a region on the given region server
+   */
+  public float getOrComputeWeightedRegionCacheRatio(int region, int server) {
+    return getTotalRegionHFileSizeMB(region) * getOrComputeRegionCacheRatio(region, server);
+  }
+
+  /**
+   * Returns the amount by which a region is cached on a given region server. If the region is not
+   * currently hosted on the given region server, then find out if it was previously hosted there
+   * and return the old cache ratio.
+   */
+  protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) {
+    float regionCacheRatio = 0.0f;
+
+    // Get the current region cache ratio if the region is hosted on the server regionServerIndex
+    for (int regionIndex : regionsPerServer[regionServerIndex]) {
+      if (region != regionIndex) {
+        continue;
+      }
+
+      Deque<BalancerRegionLoad> regionLoadList = regionLoads[regionIndex];
+
+      // The region is currently hosted on this region server. Get the region cache ratio for this
+      // region on this server
+      regionCacheRatio =
+        regionLoadList == null ? 0.0f : regionLoadList.getLast().getCurrentRegionCacheRatio();
+
+      return regionCacheRatio;
+    }
+
+    // Region is not currently hosted on this server. Check if the region was cached on this
+    // server earlier. This can happen when the server was shutdown and the cache was persisted.
+    // Search using the region name and server name and not the index id and server id as these ids
+    // may change when a server is marked as dead or a new server is added.
+    String regionEncodedName = regions[region].getEncodedName();
+    ServerName serverName = servers[regionServerIndex];
+    if (
+      regionCacheRatioOnOldServerMap != null
+        && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName)
+    ) {
+      Pair<ServerName, Float> cacheRatioOfRegionOnServer =
+        regionCacheRatioOnOldServerMap.get(regionEncodedName);
+      if (ServerName.isSameAddress(cacheRatioOfRegionOnServer.getFirst(), serverName)) {
+        regionCacheRatio = cacheRatioOfRegionOnServer.getSecond();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Old cache ratio found for region {} on server {}: {}", regionEncodedName,
+            serverName, regionCacheRatio);
+        }
+      }
+    }
+    return regionCacheRatio;
+  }
+
+  /**
+   * Populate the maps containing information about how much a region is cached on a region server.
+   */
+  private void computeRegionServerRegionCacheRatio() {
+    regionIndexServerIndexRegionCachedRatio = new HashMap<>();
+    regionServerIndexWithBestRegionCachedRatio = new int[numRegions];
+
+    for (int region = 0; region < numRegions; region++) {
+      float bestRegionCacheRatio = 0.0f;
+      int serverWithBestRegionCacheRatio = 0;
+      for (int server = 0; server < numServers; server++) {
+        float regionCacheRatio = getRegionCacheRatioOnRegionServer(region, server);
+        if (regionCacheRatio > 0.0f || server == regionIndexToServerIndex[region]) {
+          // A region with cache ratio 0 on a server means nothing. Hence, just make a note of
+          // cache ratio only if the cache ratio is greater than 0.
+          Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
+          regionIndexServerIndexRegionCachedRatio.put(regionServerPair, regionCacheRatio);
+        }
+        if (regionCacheRatio > bestRegionCacheRatio) {
+          serverWithBestRegionCacheRatio = server;
+          // If the server currently hosting the region has equal cache ratio to a historical
+          // server, consider the current server to keep hosting the region
+          bestRegionCacheRatio = regionCacheRatio;
+        } else if (
+          regionCacheRatio == bestRegionCacheRatio && server == regionIndexToServerIndex[region]
+        ) {
+          // If two servers have same region cache ratio, then the server currently hosting the
+          // region
+          // should retain the region
+          serverWithBestRegionCacheRatio = server;
+        }
+      }
+      regionServerIndexWithBestRegionCachedRatio[region] = serverWithBestRegionCacheRatio;
+      Pair<Integer, Integer> regionServerPair =
+        new Pair<>(region, regionIndexToServerIndex[region]);
+      float tempRegionCacheRatio = regionIndexServerIndexRegionCachedRatio.get(regionServerPair);
+      if (tempRegionCacheRatio > bestRegionCacheRatio) {
+        LOG.warn(
+          "INVALID CONDITION: region {} on server {} cache ratio {} is greater than the "
+            + "best region cache ratio {} on server {}",
+          regions[region].getEncodedName(), servers[regionIndexToServerIndex[region]],
+          tempRegionCacheRatio, bestRegionCacheRatio, servers[serverWithBestRegionCacheRatio]);
+      }
+    }
+  }
+
+  protected float getOrComputeRegionCacheRatio(int region, int server) {
+    if (
+      regionServerIndexWithBestRegionCachedRatio == null
+        || regionIndexServerIndexRegionCachedRatio.isEmpty()
+    ) {
+      computeRegionServerRegionCacheRatio();
+    }
+
+    Pair<Integer, Integer> regionServerPair = new Pair<>(region, server);
+    return regionIndexServerIndexRegionCachedRatio.containsKey(regionServerPair)
+      ? regionIndexServerIndexRegionCachedRatio.get(regionServerPair)
+      : 0.0f;
+  }
+
+  public int[] getOrComputeServerWithBestRegionCachedRatio() {
+    if (
+      regionServerIndexWithBestRegionCachedRatio == null
+        || regionIndexServerIndexRegionCachedRatio.isEmpty()
+    ) {
+      computeRegionServerRegionCacheRatio();
+    }
+    return regionServerIndexWithBestRegionCachedRatio;
+  }
+
   /**
    * Maps region index to rack index
    */
diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java
index ffb36cb8ca1..33d00e3de86 100644
--- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java
+++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerRegionLoad.java
@@ -34,6 +34,8 @@ class BalancerRegionLoad {
   private final long writeRequestsCount;
   private final int memStoreSizeMB;
   private final int storefileSizeMB;
+  private final int regionSizeMB;
+  private final float currentRegionPrefetchRatio;
 
   BalancerRegionLoad(RegionMetrics regionMetrics) {
     readRequestsCount = regionMetrics.getReadRequestCount();
@@ -41,6 +43,8 @@ class BalancerRegionLoad {
     writeRequestsCount = regionMetrics.getWriteRequestCount();
     memStoreSizeMB = (int) regionMetrics.getMemStoreSize().get(Size.Unit.MEGABYTE);
     storefileSizeMB = (int) regionMetrics.getStoreFileSize().get(Size.Unit.MEGABYTE);
+    regionSizeMB = (int) regionMetrics.getRegionSizeMB().get(Size.Unit.MEGABYTE);
+    currentRegionPrefetchRatio = regionMetrics.getCurrentRegionCachedRatio();
   }
 
   public long getReadRequestsCount() {
@@ -62,4 +66,12 @@ class BalancerRegionLoad {
   public int getStorefileSizeMB() {
     return storefileSizeMB;
   }
+
+  public int getRegionSizeMB() {
+    return regionSizeMB;
+  }
+
+  public float getCurrentRegionCacheRatio() {
+    return currentRegionPrefetchRatio;
+  }
 }
diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index a4560cc595a..54516868a0a 100644
--- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -232,7 +232,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
         clusterState.put(server, Collections.emptyList());
       }
     }
-    return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager);
+    return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager,
+      null);
   }
 
   private List<ServerName> findIdleServers(List<ServerName> servers) {
diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
new file mode 100644
index 00000000000..d73769a3971
--- /dev/null
+++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+/** An implementation of the {@link org.apache.hadoop.hbase.master.LoadBalancer} that assigns regions
+ * based on the amount they are cached on a given server. A region can move across the region
+ * servers whenever a region server shuts down or crashes. The region server preserves the cache
+ * periodically and restores the cache when it is restarted. This balancer implements a mechanism
+ * where it maintains the amount by which a region is cached on a region server. During balancer
+ * run, a region plan is generated that takes into account this cache information and tries to
+ * move the regions so that the cache minimally impacted.
+ */
+
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class CacheAwareLoadBalancer extends StochasticLoadBalancer {
+  private static final Logger LOG = LoggerFactory.getLogger(CacheAwareLoadBalancer.class);
+
+  private Configuration configuration;
+
+  public enum GeneratorFunctionType {
+    LOAD,
+    CACHE_RATIO
+  }
+
+  @Override
+  public synchronized void loadConf(Configuration configuration) {
+    this.configuration = configuration;
+    this.costFunctions = new ArrayList<>();
+    super.loadConf(configuration);
+  }
+
+  @Override
+  protected List<CandidateGenerator> createCandidateGenerators() {
+    List<CandidateGenerator> candidateGenerators = new ArrayList<>(2);
+    candidateGenerators.add(GeneratorFunctionType.LOAD.ordinal(),
+      new CacheAwareSkewnessCandidateGenerator());
+    candidateGenerators.add(GeneratorFunctionType.CACHE_RATIO.ordinal(),
+      new CacheAwareCandidateGenerator());
+    return candidateGenerators;
+  }
+
+  @Override
+  protected List<CostFunction> createCostFunctions(Configuration configuration) {
+    List<CostFunction> costFunctions = new ArrayList<>();
+    addCostFunction(costFunctions, new CacheAwareRegionSkewnessCostFunction(configuration));
+    addCostFunction(costFunctions, new CacheAwareCostFunction(configuration));
+    return costFunctions;
+  }
+
+  private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) {
+    if (costFunction.getMultiplier() > 0) {
+      costFunctions.add(costFunction);
+    }
+  }
+
+  @Override
+  public void updateClusterMetrics(ClusterMetrics clusterMetrics) {
+    this.clusterStatus = clusterMetrics;
+    updateRegionLoad();
+  }
+
+  /**
+   * Collect the amount of region cached for all the regions from all the active region servers.
+   */
+  private void updateRegionLoad() {
+    loads = new HashMap<>();
+    regionCacheRatioOnOldServerMap = new HashMap<>();
+    Map<String, Pair<ServerName, Integer>> regionCacheRatioOnCurrentServerMap = new HashMap<>();
+
+    // Build current region cache statistics
+    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
+      // Create a map of region and the server where it is currently hosted
+      sm.getRegionMetrics().forEach((byte[] regionName, RegionMetrics rm) -> {
+        String regionEncodedName = RegionInfo.encodeRegionName(regionName);
+
+        Deque<BalancerRegionLoad> rload = new ArrayDeque<>();
+
+        // Get the total size of the hFiles in this region
+        int regionSizeMB = (int) rm.getRegionSizeMB().get(Size.Unit.MEGABYTE);
+
+        rload.add(new BalancerRegionLoad(rm));
+        // Maintain a map of region and it's total size. This is needed to calculate the cache
+        // ratios for the regions cached on old region servers
+        regionCacheRatioOnCurrentServerMap.put(regionEncodedName, new Pair<>(sn, regionSizeMB));
+        loads.put(regionEncodedName, rload);
+      });
+    });
+
+    // Build cache statistics for the regions hosted previously on old region servers
+    clusterStatus.getLiveServerMetrics().forEach((ServerName sn, ServerMetrics sm) -> {
+      // Find if a region was previously hosted on a server other than the one it is currently
+      // hosted on.
+      sm.getRegionCachedInfo().forEach((String regionEncodedName, Integer regionSizeInCache) -> {
+        // If the region is found in regionCacheRatioOnCurrentServerMap, it is currently hosted on
+        // this server
+        if (regionCacheRatioOnCurrentServerMap.containsKey(regionEncodedName)) {
+          ServerName currentServer =
+            regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getFirst();
+          if (!ServerName.isSameAddress(currentServer, sn)) {
+            int regionSizeMB =
+              regionCacheRatioOnCurrentServerMap.get(regionEncodedName).getSecond();
+            float regionCacheRatioOnOldServer =
+              regionSizeMB == 0 ? 0.0f : (float) regionSizeInCache / regionSizeMB;
+            regionCacheRatioOnOldServerMap.put(regionEncodedName,
+              new Pair<>(sn, regionCacheRatioOnOldServer));
+          }
+        }
+      });
+    });
+  }
+
+  private RegionInfo getRegionInfoByEncodedName(BalancerClusterState cluster, String regionName) {
+    Optional<RegionInfo> regionInfoOptional =
+      Arrays.stream(cluster.regions).filter((RegionInfo ri) -> {
+        return regionName.equals(ri.getEncodedName());
+      }).findFirst();
+
+    if (regionInfoOptional.isPresent()) {
+      return regionInfoOptional.get();
+    }
+    return null;
+  }
+
+  private class CacheAwareCandidateGenerator extends CandidateGenerator {
+    @Override
+    protected BalanceAction generate(BalancerClusterState cluster) {
+      // Move the regions to the servers they were previously hosted on based on the cache ratio
+      if (
+        !regionCacheRatioOnOldServerMap.isEmpty()
+          && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext()
+      ) {
+        Map.Entry<String, Pair<ServerName, Float>> regionCacheRatioServerMap =
+          regionCacheRatioOnOldServerMap.entrySet().iterator().next();
+        // Get the server where this region was previously hosted
+        String regionEncodedName = regionCacheRatioServerMap.getKey();
+        RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName);
+        if (regionInfo == null) {
+          LOG.warn("Region {} not found", regionEncodedName);
+          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
+          return BalanceAction.NULL_ACTION;
+        }
+        if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
+          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
+          return BalanceAction.NULL_ACTION;
+        }
+        int regionIndex = cluster.regionsToIndex.get(regionInfo);
+        int oldServerIndex = cluster.serversToIndex
+          .get(regionCacheRatioOnOldServerMap.get(regionEncodedName).getFirst().getAddress());
+        if (oldServerIndex < 0) {
+          LOG.warn("Server previously hosting region {} not found", regionEncodedName);
+          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
+          return BalanceAction.NULL_ACTION;
+        }
+
+        float oldRegionCacheRatio =
+          cluster.getOrComputeRegionCacheRatio(regionIndex, oldServerIndex);
+        int currentServerIndex = cluster.regionIndexToServerIndex[regionIndex];
+        float currentRegionCacheRatio =
+          cluster.getOrComputeRegionCacheRatio(regionIndex, currentServerIndex);
+
+        BalanceAction action = generatePlan(cluster, regionIndex, currentServerIndex,
+          currentRegionCacheRatio, oldServerIndex, oldRegionCacheRatio);
+        regionCacheRatioOnOldServerMap.remove(regionEncodedName);
+        return action;
+      }
+      return BalanceAction.NULL_ACTION;
+    }
+
+    private BalanceAction generatePlan(BalancerClusterState cluster, int regionIndex,
+      int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex,
+      float cacheRatioOnOldServer) {
+      return moveRegionToOldServer(cluster, regionIndex, currentServerIndex,
+        cacheRatioOnCurrentServer, oldServerIndex, cacheRatioOnOldServer)
+          ? getAction(currentServerIndex, regionIndex, oldServerIndex, -1)
+          : BalanceAction.NULL_ACTION;
+    }
+
+    private boolean moveRegionToOldServer(BalancerClusterState cluster, int regionIndex,
+      int currentServerIndex, float cacheRatioOnCurrentServer, int oldServerIndex,
+      float cacheRatioOnOldServer) {
+      // Find if the region has already moved by comparing the current server index with the
+      // current server index. This can happen when other candidate generator has moved the region
+      if (currentServerIndex < 0 || oldServerIndex < 0) {
+        return false;
+      }
+
+      float cacheRatioDiffThreshold = 0.6f;
+
+      // Conditions for moving the region
+
+      // If the region is fully cached on the old server, move the region back
+      if (cacheRatioOnOldServer == 1.0f) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Region {} moved to the old server {} as it is fully cached there",
+            cluster.regions[regionIndex].getEncodedName(), cluster.servers[oldServerIndex]);
+        }
+        return true;
+      }
+
+      // Move the region back to the old server if it is cached equally on both the servers
+      if (cacheRatioOnCurrentServer == cacheRatioOnOldServer) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+            "Region {} moved from {} to {} as the region is cached {} equally on both servers",
+            cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex],
+            cluster.servers[oldServerIndex], cacheRatioOnCurrentServer);
+        }
+        return true;
+      }
+
+      // If the region is not fully cached on either of the servers, move the region back to the
+      // old server if the region cache ratio on the current server is still much less than the old
+      // server
+      if (
+        cacheRatioOnOldServer > 0.0f
+          && cacheRatioOnCurrentServer / cacheRatioOnOldServer < cacheRatioDiffThreshold
+      ) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+            "Region {} moved from {} to {} as region cache ratio {} is better than the current "
+              + "cache ratio {}",
+            cluster.regions[regionIndex].getEncodedName(), cluster.servers[currentServerIndex],
+            cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer);
+        }
+        return true;
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+          "Region {} not moved from {} to {} with current cache ratio {} and old cache ratio {}",
+          cluster.regions[regionIndex], cluster.servers[currentServerIndex],
+          cluster.servers[oldServerIndex], cacheRatioOnCurrentServer, cacheRatioOnOldServer);
+      }
+      return false;
+    }
+  }
+
+  private class CacheAwareSkewnessCandidateGenerator extends LoadCandidateGenerator {
+    @Override
+    BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) {
+      // First move all the regions which were hosted previously on some other server back to their
+      // old servers
+      if (
+        !regionCacheRatioOnOldServerMap.isEmpty()
+          && regionCacheRatioOnOldServerMap.entrySet().iterator().hasNext()
+      ) {
+        // Get the first region index in the historical cache ratio list
+        Map.Entry<String, Pair<ServerName, Float>> regionEntry =
+          regionCacheRatioOnOldServerMap.entrySet().iterator().next();
+        String regionEncodedName = regionEntry.getKey();
+
+        RegionInfo regionInfo = getRegionInfoByEncodedName(cluster, regionEncodedName);
+        if (regionInfo == null) {
+          LOG.warn("Region {} does not exist", regionEncodedName);
+          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
+          return BalanceAction.NULL_ACTION;
+        }
+        if (regionInfo.isMetaRegion() || regionInfo.getTable().isSystemTable()) {
+          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
+          return BalanceAction.NULL_ACTION;
+        }
+
+        int regionIndex = cluster.regionsToIndex.get(regionInfo);
+
+        // Get the current host name for this region
+        thisServer = cluster.regionIndexToServerIndex[regionIndex];
+
+        // Get the old server index
+        otherServer = cluster.serversToIndex.get(regionEntry.getValue().getFirst().getAddress());
+
+        regionCacheRatioOnOldServerMap.remove(regionEncodedName);
+
+        if (otherServer < 0) {
+          // The old server has been moved to other host and hence, the region cannot be moved back
+          // to the old server
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(
+              "CacheAwareSkewnessCandidateGenerator: Region {} not moved to the old "
+                + "server {} as the server does not exist",
+              regionEncodedName, regionEntry.getValue().getFirst().getHostname());
+          }
+          return BalanceAction.NULL_ACTION;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+            "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it "
+              + "was hosted their earlier",
+            regionEncodedName, cluster.servers[thisServer].getHostname(),
+            cluster.servers[otherServer].getHostname());
+        }
+
+        return getAction(thisServer, regionIndex, otherServer, -1);
+      }
+
+      if (thisServer < 0 || otherServer < 0) {
+        return BalanceAction.NULL_ACTION;
+      }
+
+      int regionIndexToMove = pickLeastCachedRegion(cluster, thisServer);
+      if (regionIndexToMove < 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("CacheAwareSkewnessCandidateGenerator: No region found for movement");
+        }
+        return BalanceAction.NULL_ACTION;
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+          "CacheAwareSkewnessCandidateGenerator: Region {} moved from {} to {} as it is "
+            + "least cached on current server",
+          cluster.regions[regionIndexToMove].getEncodedName(),
+          cluster.servers[thisServer].getHostname(), cluster.servers[otherServer].getHostname());
+      }
+      return getAction(thisServer, regionIndexToMove, otherServer, -1);
+    }
+
+    private int pickLeastCachedRegion(BalancerClusterState cluster, int thisServer) {
+      float minCacheRatio = Float.MAX_VALUE;
+      int leastCachedRegion = -1;
+      for (int i = 0; i < cluster.regionsPerServer[thisServer].length; i++) {
+        int regionIndex = cluster.regionsPerServer[thisServer][i];
+
+        float cacheRatioOnCurrentServer =
+          cluster.getOrComputeRegionCacheRatio(regionIndex, thisServer);
+        if (cacheRatioOnCurrentServer < minCacheRatio) {
+          minCacheRatio = cacheRatioOnCurrentServer;
+          leastCachedRegion = regionIndex;
+        }
+      }
+      return leastCachedRegion;
+    }
+  }
+
+  static class CacheAwareRegionSkewnessCostFunction extends CostFunction {
+    static final String REGION_COUNT_SKEW_COST_KEY =
+      "hbase.master.balancer.stochastic.regionCountCost";
+    static final float DEFAULT_REGION_COUNT_SKEW_COST = 20;
+    private final DoubleArrayCost cost = new DoubleArrayCost();
+
+    CacheAwareRegionSkewnessCostFunction(Configuration conf) {
+      // Load multiplier should be the greatest as it is the most general way to balance data.
+      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
+    }
+
+    @Override
+    void prepare(BalancerClusterState cluster) {
+      super.prepare(cluster);
+      cost.prepare(cluster.numServers);
+      cost.applyCostsChange(costs -> {
+        for (int i = 0; i < cluster.numServers; i++) {
+          costs[i] = cluster.regionsPerServer[i].length;
+        }
+      });
+    }
+
+    @Override
+    protected double cost() {
+      return cost.cost();
+    }
+
+    @Override
+    protected void regionMoved(int region, int oldServer, int newServer) {
+      cost.applyCostsChange(costs -> {
+        costs[oldServer] = cluster.regionsPerServer[oldServer].length;
+        costs[newServer] = cluster.regionsPerServer[newServer].length;
+      });
+    }
+
+    public final void updateWeight(double[] weights) {
+      weights[GeneratorFunctionType.LOAD.ordinal()] += cost();
+    }
+  }
+
+  static class CacheAwareCostFunction extends CostFunction {
+    private static final String CACHE_COST_KEY = "hbase.master.balancer.stochastic.cacheCost";
+    private double cacheRatio;
+    private double bestCacheRatio;
+
+    private static final float DEFAULT_CACHE_COST = 20;
+
+    CacheAwareCostFunction(Configuration conf) {
+      boolean isPersistentCache = conf.get(BUCKET_CACHE_PERSISTENT_PATH_KEY) != null;
+      // Disable the CacheAwareCostFunction if the cached file list persistence is not enabled
+      this.setMultiplier(
+        !isPersistentCache ? 0.0f : conf.getFloat(CACHE_COST_KEY, DEFAULT_CACHE_COST));
+      bestCacheRatio = 0.0;
+      cacheRatio = 0.0;
+    }
+
+    @Override
+    void prepare(BalancerClusterState cluster) {
+      super.prepare(cluster);
+      cacheRatio = 0.0;
+      bestCacheRatio = 0.0;
+
+      for (int region = 0; region < cluster.numRegions; region++) {
+        cacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region,
+          cluster.regionIndexToServerIndex[region]);
+        bestCacheRatio += cluster.getOrComputeWeightedRegionCacheRatio(region,
+          getServerWithBestCacheRatioForRegion(region));
+      }
+
+      cacheRatio = bestCacheRatio == 0 ? 1.0 : cacheRatio / bestCacheRatio;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("CacheAwareCostFunction: Cost: {}", 1 - cacheRatio);
+      }
+    }
+
+    @Override
+    protected double cost() {
+      return scale(0, 1, 1 - cacheRatio);
+    }
+
+    @Override
+    protected void regionMoved(int region, int oldServer, int newServer) {
+      double regionCacheRatioOnOldServer =
+        cluster.getOrComputeWeightedRegionCacheRatio(region, oldServer);
+      double regionCacheRatioOnNewServer =
+        cluster.getOrComputeWeightedRegionCacheRatio(region, newServer);
+      double cacheRatioDiff = regionCacheRatioOnNewServer - regionCacheRatioOnOldServer;
+      double normalizedDelta = bestCacheRatio == 0.0 ? 0.0 : cacheRatioDiff / bestCacheRatio;
+      cacheRatio += normalizedDelta;
+      if (LOG.isDebugEnabled() && (cacheRatio < 0.0 || cacheRatio > 1.0)) {
+        LOG.debug(
+          "CacheAwareCostFunction:regionMoved:region:{}:from:{}:to:{}:regionCacheRatioOnOldServer:{}:"
+            + "regionCacheRatioOnNewServer:{}:bestRegionCacheRatio:{}:cacheRatio:{}",
+          cluster.regions[region].getEncodedName(), cluster.servers[oldServer].getHostname(),
+          cluster.servers[newServer].getHostname(), regionCacheRatioOnOldServer,
+          regionCacheRatioOnNewServer, bestCacheRatio, cacheRatio);
+      }
+    }
+
+    private int getServerWithBestCacheRatioForRegion(int region) {
+      return cluster.getOrComputeServerWithBestRegionCachedRatio()[region];
+    }
+
+    @Override
+    public final void updateWeight(double[] weights) {
+      weights[GeneratorFunctionType.CACHE_RATIO.ordinal()] += cost();
+    }
+  }
+}
diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index edf049e8a71..e5cd5446c5c 100644
--- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.master.RackManager;
 import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -136,8 +137,10 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
   private long maxRunningTime = DEFAULT_MAX_RUNNING_TIME;
   private int numRegionLoadsToRemember = DEFAULT_KEEP_REGION_LOADS;
   private float minCostNeedBalance = DEFAULT_MIN_COST_NEED_BALANCE;
+  Map<String, Pair<ServerName, Float>> regionCacheRatioOnOldServerMap = new HashMap<>();
 
-  private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC
+  protected List<CostFunction> costFunctions; // FindBugs: Wants this protected;
+                                              // IS2_INCONSISTENT_SYNC
   // To save currently configed sum of multiplier. Defaulted at 1 for cases that carry high cost
   private float sumMultiplier;
   // to save and report costs to JMX
@@ -224,6 +227,24 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     return candidateGenerators;
   }
 
+  protected List<CostFunction> createCostFunctions(Configuration conf) {
+    List<CostFunction> costFunctions = new ArrayList<>();
+    addCostFunction(costFunctions, new RegionCountSkewCostFunction(conf));
+    addCostFunction(costFunctions, new PrimaryRegionCountSkewCostFunction(conf));
+    addCostFunction(costFunctions, new MoveCostFunction(conf, provider));
+    addCostFunction(costFunctions, localityCost);
+    addCostFunction(costFunctions, rackLocalityCost);
+    addCostFunction(costFunctions, new TableSkewCostFunction(conf));
+    addCostFunction(costFunctions, regionReplicaHostCostFunction);
+    addCostFunction(costFunctions, regionReplicaRackCostFunction);
+    addCostFunction(costFunctions, new ReadRequestCostFunction(conf));
+    addCostFunction(costFunctions, new CPRequestCostFunction(conf));
+    addCostFunction(costFunctions, new WriteRequestCostFunction(conf));
+    addCostFunction(costFunctions, new MemStoreSizeCostFunction(conf));
+    addCostFunction(costFunctions, new StoreFileCostFunction(conf));
+    return costFunctions;
+  }
+
   @Override
   protected void loadConf(Configuration conf) {
     super.loadConf(conf);
@@ -242,20 +263,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
 
     regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
     regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);
-    costFunctions = new ArrayList<>();
-    addCostFunction(new RegionCountSkewCostFunction(conf));
-    addCostFunction(new PrimaryRegionCountSkewCostFunction(conf));
-    addCostFunction(new MoveCostFunction(conf, provider));
-    addCostFunction(localityCost);
-    addCostFunction(rackLocalityCost);
-    addCostFunction(new TableSkewCostFunction(conf));
-    addCostFunction(regionReplicaHostCostFunction);
-    addCostFunction(regionReplicaRackCostFunction);
-    addCostFunction(new ReadRequestCostFunction(conf));
-    addCostFunction(new CPRequestCostFunction(conf));
-    addCostFunction(new WriteRequestCostFunction(conf));
-    addCostFunction(new MemStoreSizeCostFunction(conf));
-    addCostFunction(new StoreFileCostFunction(conf));
+    this.costFunctions = createCostFunctions(conf);
     loadCustomCostFunctions(conf);
 
     curFunctionCosts = new double[costFunctions.size()];
@@ -459,8 +467,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     // The clusterState that is given to this method contains the state
     // of all the regions in the table(s) (that's true today)
     // Keep track of servers to iterate through them.
-    BalancerClusterState cluster =
-      new BalancerClusterState(loadOfOneTable, loads, finder, rackManager);
+    BalancerClusterState cluster = new BalancerClusterState(loadOfOneTable, loads, finder,
+      rackManager, regionCacheRatioOnOldServerMap);
 
     long startTime = EnvironmentEdgeManager.currentTime();
 
@@ -568,7 +576,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     return null;
   }
 
-  private void sendRejectionReasonToRingBuffer(Supplier<String> reason,
+  protected void sendRejectionReasonToRingBuffer(Supplier<String> reason,
     List<CostFunction> costFunctions) {
     provider.recordBalancerRejection(() -> {
       BalancerRejection.Builder builder = new BalancerRejection.Builder().setReason(reason.get());
@@ -627,14 +635,14 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     }
   }
 
-  private void addCostFunction(CostFunction costFunction) {
+  private void addCostFunction(List<CostFunction> costFunctions, CostFunction costFunction) {
     float multiplier = costFunction.getMultiplier();
     if (multiplier > 0) {
       costFunctions.add(costFunction);
     }
   }
 
-  private String functionCost() {
+  protected String functionCost() {
     StringBuilder builder = new StringBuilder();
     for (CostFunction c : costFunctions) {
       builder.append(c.getClass().getSimpleName());
@@ -655,6 +663,12 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     return builder.toString();
   }
 
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
+  List<CostFunction> getCostFunctions() {
+    return costFunctions;
+  }
+
   private String totalCostsPerFunc() {
     StringBuilder builder = new StringBuilder();
     for (CostFunction c : costFunctions) {
diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
index 9ea1c94d1e0..4a996e7796f 100644
--- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
+++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
@@ -23,6 +23,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -376,6 +377,19 @@ public class BalancerTestBase {
     return servers;
   }
 
+  protected Map<ServerName, List<RegionInfo>> mockClusterServersUnsorted(int[] mockCluster,
+    int numTables) {
+    int numServers = mockCluster.length;
+    Map<ServerName, List<RegionInfo>> servers = new LinkedHashMap<>();
+    for (int i = 0; i < numServers; i++) {
+      int numRegions = mockCluster[i];
+      ServerAndLoad sal = randomServer(0);
+      List<RegionInfo> regions = randomRegions(numRegions, numTables);
+      servers.put(sal.getServerName(), regions);
+    }
+    return servers;
+  }
+
   protected TreeMap<ServerName, List<RegionInfo>> mockUniformClusterServers(int[] mockCluster) {
     int numServers = mockCluster.length;
     TreeMap<ServerName, List<RegionInfo>> servers = new TreeMap<>();
diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
index 21f3a3b66c9..cc16cfe2ec8 100644
--- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
+++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
@@ -139,6 +139,8 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
       when(rl.getWriteRequestCount()).thenReturn(0L);
       when(rl.getMemStoreSize()).thenReturn(Size.ZERO);
       when(rl.getStoreFileSize()).thenReturn(Size.ZERO);
+      when(rl.getRegionSizeMB()).thenReturn(Size.ZERO);
+      when(rl.getCurrentRegionCachedRatio()).thenReturn(0.0f);
       regionLoadMap.put(info.getRegionName(), rl);
     }
     when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap);
@@ -213,6 +215,8 @@ public class TestStochasticLoadBalancer extends StochasticBalancerTestBase {
       when(rl.getWriteRequestCount()).thenReturn(0L);
       when(rl.getMemStoreSize()).thenReturn(Size.ZERO);
       when(rl.getStoreFileSize()).thenReturn(new Size(i, Size.Unit.MEGABYTE));
+      when(rl.getRegionSizeMB()).thenReturn(Size.ZERO);
+      when(rl.getCurrentRegionCachedRatio()).thenReturn(0.0f);
 
       Map<byte[], RegionMetrics> regionLoadMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
       regionLoadMap.put(Bytes.toBytes(REGION_KEY), rl);
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 12479979b2b..2aa9ecf69ec 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1335,6 +1335,18 @@ public final class HConstants {
    */
   public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size";
 
+  /**
+   * If the chosen ioengine can persist its state across restarts, the path to the file to persist
+   * to. This file is NOT the data file. It is a file into which we will serialize the map of what
+   * is in the data file. For example, if you pass the following argument as
+   * BUCKET_CACHE_IOENGINE_KEY ("hbase.bucketcache.ioengine"),
+   * <code>file:/tmp/bucketcache.data </code>, then we will write the bucketcache data to the file
+   * <code>/tmp/bucketcache.data</code> but the metadata on where the data is in the supplied file
+   * is an in-memory map that needs to be persisted across restarts. Where to store this in-memory
+   * state is what you supply here: e.g. <code>/tmp/bucketcache.map</code>.
+   */
+  public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = "hbase.bucketcache.persistent.path";
+
   /**
    * HConstants for fast fail on the client side follow
    */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java
index 38a296aad52..6956d584d92 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.io.hfile;
 
 import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY;
 import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
 
 import java.io.IOException;
@@ -47,18 +48,6 @@ public final class BlockCacheFactory {
   public static final String BLOCKCACHE_POLICY_KEY = "hfile.block.cache.policy";
   public static final String BLOCKCACHE_POLICY_DEFAULT = "LRU";
 
-  /**
-   * If the chosen ioengine can persist its state across restarts, the path to the file to persist
-   * to. This file is NOT the data file. It is a file into which we will serialize the map of what
-   * is in the data file. For example, if you pass the following argument as
-   * BUCKET_CACHE_IOENGINE_KEY ("hbase.bucketcache.ioengine"),
-   * <code>file:/tmp/bucketcache.data </code>, then we will write the bucketcache data to the file
-   * <code>/tmp/bucketcache.data</code> but the metadata on where the data is in the supplied file
-   * is an in-memory map that needs to be persisted across restarts. Where to store this in-memory
-   * state is what you supply here: e.g. <code>/tmp/bucketcache.map</code>.
-   */
-  public static final String BUCKET_CACHE_PERSISTENT_PATH_KEY = "hbase.bucketcache.persistent.path";
-
   public static final String BUCKET_CACHE_WRITER_THREADS_KEY = "hbase.bucketcache.writer.threads";
 
   public static final String BUCKET_CACHE_WRITER_QUEUE_KEY = "hbase.bucketcache.writer.queuelength";
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index ca7750c92c5..f321d034bc6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -347,6 +347,7 @@ public class BucketCache implements BlockCache, HeapSize {
         fullyCachedFiles.clear();
         backingMapValidated.set(true);
         bucketAllocator = new BucketAllocator(capacity, bucketSizes);
+        regionCachedSizeMap.clear();
       }
     } else {
       bucketAllocator = new BucketAllocator(capacity, bucketSizes);
@@ -1517,6 +1518,7 @@ public class BucketCache implements BlockCache, HeapSize {
       // If persistent ioengine and a path, we will serialize out the backingMap.
       this.backingMap.clear();
       this.fullyCachedFiles.clear();
+      this.regionCachedSizeMap.clear();
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
new file mode 100644
index 00000000000..3ecd8dc7cfd
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancer.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerMetrics;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Size;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
+@Category({ LargeTests.class })
+public class TestCacheAwareLoadBalancer extends BalancerTestBase {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestCacheAwareLoadBalancer.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestCacheAwareLoadBalancer.class);
+
+  private static CacheAwareLoadBalancer loadBalancer;
+
+  static List<ServerName> servers;
+
+  static List<TableDescriptor> tableDescs;
+
+  static Map<TableName, String> tableMap = new HashMap<>();
+
+  static TableName[] tables = new TableName[] { TableName.valueOf("dt1"), TableName.valueOf("dt2"),
+    TableName.valueOf("dt3"), TableName.valueOf("dt4") };
+
+  private static List<ServerName> generateServers(int numServers) {
+    List<ServerName> servers = new ArrayList<>(numServers);
+    Random rand = ThreadLocalRandom.current();
+    for (int i = 0; i < numServers; i++) {
+      String host = "server" + rand.nextInt(100000);
+      int port = rand.nextInt(60000);
+      servers.add(ServerName.valueOf(host, port, -1));
+    }
+    return servers;
+  }
+
+  private static List<TableDescriptor> constructTableDesc(boolean hasBogusTable) {
+    List<TableDescriptor> tds = Lists.newArrayList();
+    for (int i = 0; i < tables.length; i++) {
+      TableDescriptor htd = TableDescriptorBuilder.newBuilder(tables[i]).build();
+      tds.add(htd);
+    }
+    return tds;
+  }
+
+  private ServerMetrics mockServerMetricsWithRegionCacheInfo(ServerName server,
+    List<RegionInfo> regionsOnServer, float currentCacheRatio, List<RegionInfo> oldRegionCacheInfo,
+    int oldRegionCachedSize, int regionSize) {
+    ServerMetrics serverMetrics = mock(ServerMetrics.class);
+    Map<byte[], RegionMetrics> regionLoadMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    for (RegionInfo info : regionsOnServer) {
+      RegionMetrics rl = mock(RegionMetrics.class);
+      when(rl.getReadRequestCount()).thenReturn(0L);
+      when(rl.getWriteRequestCount()).thenReturn(0L);
+      when(rl.getMemStoreSize()).thenReturn(Size.ZERO);
+      when(rl.getStoreFileSize()).thenReturn(Size.ZERO);
+      when(rl.getCurrentRegionCachedRatio()).thenReturn(currentCacheRatio);
+      when(rl.getRegionSizeMB()).thenReturn(new Size(regionSize, Size.Unit.MEGABYTE));
+      regionLoadMap.put(info.getRegionName(), rl);
+    }
+    when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap);
+    Map<String, Integer> oldCacheRatioMap = new HashMap<>();
+    for (RegionInfo info : oldRegionCacheInfo) {
+      oldCacheRatioMap.put(info.getEncodedName(), oldRegionCachedSize);
+    }
+    when(serverMetrics.getRegionCachedInfo()).thenReturn(oldCacheRatioMap);
+    return serverMetrics;
+  }
+
+  @BeforeClass
+  public static void beforeAllTests() throws Exception {
+    servers = generateServers(3);
+    tableDescs = constructTableDesc(false);
+    Configuration conf = HBaseConfiguration.create();
+    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "prefetch_file_list");
+    loadBalancer = new CacheAwareLoadBalancer();
+    loadBalancer.setClusterInfoProvider(new DummyClusterInfoProvider(conf));
+    loadBalancer.loadConf(conf);
+  }
+
+  @Test
+  public void testRegionsNotCachedOnOldServerAndCurrentServer() throws Exception {
+    // The regions are not cached on old server as well as the current server. This causes
+    // skewness in the region allocation which should be fixed by the balancer
+
+    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
+    ServerName server0 = servers.get(0);
+    ServerName server1 = servers.get(1);
+    ServerName server2 = servers.get(2);
+
+    // Simulate that the regions previously hosted by server1 are now hosted on server0
+    List<RegionInfo> regionsOnServer0 = randomRegions(10);
+    List<RegionInfo> regionsOnServer1 = randomRegions(0);
+    List<RegionInfo> regionsOnServer2 = randomRegions(5);
+
+    clusterState.put(server0, regionsOnServer0);
+    clusterState.put(server1, regionsOnServer1);
+    clusterState.put(server2, regionsOnServer2);
+
+    // Mock cluster metrics
+    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
+    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
+      0.0f, new ArrayList<>(), 0, 10));
+    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
+      0.0f, new ArrayList<>(), 0, 10));
+    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
+      0.0f, new ArrayList<>(), 0, 10));
+    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
+    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
+    loadBalancer.updateClusterMetrics(clusterMetrics);
+
+    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
+      (Map) mockClusterServersWithTables(clusterState);
+    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
+    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
+    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
+    for (RegionPlan plan : plans) {
+      if (plan.getSource().equals(server0)) {
+        regionsMovedFromServer0.add(plan.getRegionInfo());
+        if (!targetServers.containsKey(plan.getDestination())) {
+          targetServers.put(plan.getDestination(), new ArrayList<>());
+        }
+        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
+      }
+    }
+    // should move 5 regions from server0 to server 1
+    assertEquals(5, regionsMovedFromServer0.size());
+    assertEquals(5, targetServers.get(server1).size());
+  }
+
+  @Test
+  public void testRegionsPartiallyCachedOnOldServerAndNotCachedOnCurrentServer() throws Exception {
+    // The regions are partially cached on old server but not cached on the current server
+
+    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
+    ServerName server0 = servers.get(0);
+    ServerName server1 = servers.get(1);
+    ServerName server2 = servers.get(2);
+
+    // Simulate that the regions previously hosted by server1 are now hosted on server0
+    List<RegionInfo> regionsOnServer0 = randomRegions(10);
+    List<RegionInfo> regionsOnServer1 = randomRegions(0);
+    List<RegionInfo> regionsOnServer2 = randomRegions(5);
+
+    clusterState.put(server0, regionsOnServer0);
+    clusterState.put(server1, regionsOnServer1);
+    clusterState.put(server2, regionsOnServer2);
+
+    // Mock cluster metrics
+
+    // Mock 5 regions from server0 were previously hosted on server1
+    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1);
+
+    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
+    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
+      0.0f, new ArrayList<>(), 0, 10));
+    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
+      0.0f, oldCachedRegions, 6, 10));
+    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
+      0.0f, new ArrayList<>(), 0, 10));
+    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
+    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
+    loadBalancer.updateClusterMetrics(clusterMetrics);
+
+    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
+      (Map) mockClusterServersWithTables(clusterState);
+    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
+    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
+    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
+    for (RegionPlan plan : plans) {
+      if (plan.getSource().equals(server0)) {
+        regionsMovedFromServer0.add(plan.getRegionInfo());
+        if (!targetServers.containsKey(plan.getDestination())) {
+          targetServers.put(plan.getDestination(), new ArrayList<>());
+        }
+        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
+      }
+    }
+    // should move 5 regions from server0 to server1
+    assertEquals(5, regionsMovedFromServer0.size());
+    assertEquals(5, targetServers.get(server1).size());
+    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
+  }
+
+  @Test
+  public void testRegionsFullyCachedOnOldServerAndNotCachedOnCurrentServers() throws Exception {
+    // The regions are fully cached on old server
+
+    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
+    ServerName server0 = servers.get(0);
+    ServerName server1 = servers.get(1);
+    ServerName server2 = servers.get(2);
+
+    // Simulate that the regions previously hosted by server1 are now hosted on server0
+    List<RegionInfo> regionsOnServer0 = randomRegions(10);
+    List<RegionInfo> regionsOnServer1 = randomRegions(0);
+    List<RegionInfo> regionsOnServer2 = randomRegions(5);
+
+    clusterState.put(server0, regionsOnServer0);
+    clusterState.put(server1, regionsOnServer1);
+    clusterState.put(server2, regionsOnServer2);
+
+    // Mock cluster metrics
+
+    // Mock 5 regions from server0 were previously hosted on server1
+    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1);
+
+    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
+    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
+      0.0f, new ArrayList<>(), 0, 10));
+    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
+      0.0f, oldCachedRegions, 10, 10));
+    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
+      0.0f, new ArrayList<>(), 0, 10));
+    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
+    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
+    loadBalancer.updateClusterMetrics(clusterMetrics);
+
+    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
+      (Map) mockClusterServersWithTables(clusterState);
+    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
+    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
+    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
+    for (RegionPlan plan : plans) {
+      if (plan.getSource().equals(server0)) {
+        regionsMovedFromServer0.add(plan.getRegionInfo());
+        if (!targetServers.containsKey(plan.getDestination())) {
+          targetServers.put(plan.getDestination(), new ArrayList<>());
+        }
+        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
+      }
+    }
+    // should move 5 regions from server0 to server1
+    assertEquals(5, regionsMovedFromServer0.size());
+    assertEquals(5, targetServers.get(server1).size());
+    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
+  }
+
+  @Test
+  public void testRegionsFullyCachedOnOldAndCurrentServers() throws Exception {
+    // The regions are fully cached on old server
+
+    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
+    ServerName server0 = servers.get(0);
+    ServerName server1 = servers.get(1);
+    ServerName server2 = servers.get(2);
+
+    // Simulate that the regions previously hosted by server1 are now hosted on server0
+    List<RegionInfo> regionsOnServer0 = randomRegions(10);
+    List<RegionInfo> regionsOnServer1 = randomRegions(0);
+    List<RegionInfo> regionsOnServer2 = randomRegions(5);
+
+    clusterState.put(server0, regionsOnServer0);
+    clusterState.put(server1, regionsOnServer1);
+    clusterState.put(server2, regionsOnServer2);
+
+    // Mock cluster metrics
+
+    // Mock 5 regions from server0 were previously hosted on server1
+    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1);
+
+    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
+    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
+      1.0f, new ArrayList<>(), 0, 10));
+    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
+      1.0f, oldCachedRegions, 10, 10));
+    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
+      1.0f, new ArrayList<>(), 0, 10));
+    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
+    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
+    loadBalancer.updateClusterMetrics(clusterMetrics);
+
+    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
+      (Map) mockClusterServersWithTables(clusterState);
+    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
+    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
+    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
+    for (RegionPlan plan : plans) {
+      if (plan.getSource().equals(server0)) {
+        regionsMovedFromServer0.add(plan.getRegionInfo());
+        if (!targetServers.containsKey(plan.getDestination())) {
+          targetServers.put(plan.getDestination(), new ArrayList<>());
+        }
+        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
+      }
+    }
+    // should move 5 regions from server0 to server1
+    assertEquals(5, regionsMovedFromServer0.size());
+    assertEquals(5, targetServers.get(server1).size());
+    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
+  }
+
+  @Test
+  public void testRegionsPartiallyCachedOnOldServerAndCurrentServer() throws Exception {
+    // The regions are partially cached on old server
+
+    Map<ServerName, List<RegionInfo>> clusterState = new HashMap<>();
+    ServerName server0 = servers.get(0);
+    ServerName server1 = servers.get(1);
+    ServerName server2 = servers.get(2);
+
+    // Simulate that the regions previously hosted by server1 are now hosted on server0
+    List<RegionInfo> regionsOnServer0 = randomRegions(10);
+    List<RegionInfo> regionsOnServer1 = randomRegions(0);
+    List<RegionInfo> regionsOnServer2 = randomRegions(5);
+
+    clusterState.put(server0, regionsOnServer0);
+    clusterState.put(server1, regionsOnServer1);
+    clusterState.put(server2, regionsOnServer2);
+
+    // Mock cluster metrics
+
+    // Mock 5 regions from server0 were previously hosted on server1
+    List<RegionInfo> oldCachedRegions = regionsOnServer0.subList(5, regionsOnServer0.size() - 1);
+
+    Map<ServerName, ServerMetrics> serverMetricsMap = new TreeMap<>();
+    serverMetricsMap.put(server0, mockServerMetricsWithRegionCacheInfo(server0, regionsOnServer0,
+      0.2f, new ArrayList<>(), 0, 10));
+    serverMetricsMap.put(server1, mockServerMetricsWithRegionCacheInfo(server1, regionsOnServer1,
+      0.0f, oldCachedRegions, 6, 10));
+    serverMetricsMap.put(server2, mockServerMetricsWithRegionCacheInfo(server2, regionsOnServer2,
+      1.0f, new ArrayList<>(), 0, 10));
+    ClusterMetrics clusterMetrics = mock(ClusterMetrics.class);
+    when(clusterMetrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
+    loadBalancer.updateClusterMetrics(clusterMetrics);
+
+    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable =
+      (Map) mockClusterServersWithTables(clusterState);
+    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
+    Set<RegionInfo> regionsMovedFromServer0 = new HashSet<>();
+    Map<ServerName, List<RegionInfo>> targetServers = new HashMap<>();
+    for (RegionPlan plan : plans) {
+      if (plan.getSource().equals(server0)) {
+        regionsMovedFromServer0.add(plan.getRegionInfo());
+        if (!targetServers.containsKey(plan.getDestination())) {
+          targetServers.put(plan.getDestination(), new ArrayList<>());
+        }
+        targetServers.get(plan.getDestination()).add(plan.getRegionInfo());
+      }
+    }
+    assertEquals(5, regionsMovedFromServer0.size());
+    assertEquals(5, targetServers.get(server1).size());
+    assertTrue(targetServers.get(server1).containsAll(oldCachedRegions));
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancerCostFunctions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancerCostFunctions.java
new file mode 100644
index 00000000000..448e576b1bc
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestCacheAwareLoadBalancerCostFunctions.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.balancer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MasterTests.class, MediumTests.class })
+public class TestCacheAwareLoadBalancerCostFunctions extends StochasticBalancerTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestCacheAwareLoadBalancerCostFunctions.class);
+
+  // Mapping of test -> expected cache cost
+  private final float[] expectedCacheCost = { 0.0f, 0.0f, 0.5f, 1.0f, 0.0f, 0.572f, 0.0f, 0.075f };
+
+  /**
+   * Data set to testCacheCost: [test][0][0] = mapping of server to number of regions it hosts
+   * [test][region + 1][0] = server that region is hosted on [test][region + 1][server + 1] = size
+   * of region cached on server
+   */
+  private final int[][][] clusterRegionCacheRatioMocks = new int[][][] {
+    // Test 1: each region is entirely on server that hosts it
+    // Cost of moving the regions in this case should be high as the regions are fully cached
+    // on the server they are currently hosted on
+    new int[][] { new int[] { 2, 1, 1 }, // Server 0 has 2, server 1 has 1 and server 2 has 1
+      // region(s) hosted respectively
+      new int[] { 0, 100, 0, 0 }, // region 0 is hosted and cached only on server 0
+      new int[] { 0, 100, 0, 0 }, // region 1 is hosted and cached only on server 0
+      new int[] { 1, 0, 100, 0 }, // region 2 is hosted and cached only on server 1
+      new int[] { 2, 0, 0, 100 }, // region 3 is hosted and cached only on server 2
+    },
+
+    // Test 2: each region is cached completely on the server it is currently hosted on,
+    // but it was also cached on some other server historically
+    // Cost of moving the regions in this case should be high as the regions are fully cached
+    // on the server they are currently hosted on. Although, the regions were previously hosted and
+    // cached on some other server, since they are completely cached on the new server,
+    // there is no need to move the regions back to the previously hosting cluster
+    new int[][] { new int[] { 1, 2, 1 }, // Server 0 has 1, server 1 has 2 and server 2 has 1
+      // region(s) hosted respectively
+      new int[] { 0, 100, 0, 100 }, // region 0 is hosted and currently cached on server 0,
+      // but previously cached completely on server 2
+      new int[] { 1, 100, 100, 0 }, // region 1 is hosted and currently cached on server 1,
+      // but previously cached completely on server 0
+      new int[] { 1, 0, 100, 100 }, // region 2 is hosted and currently cached on server 1,
+      // but previously cached on server 2
+      new int[] { 2, 0, 100, 100 }, // region 3 is hosted and currently cached on server 2,
+    // but previously cached on server 1
+    },
+
+    // Test 3: The regions were hosted and fully cached on a server but later moved to other
+    // because of server crash procedure. The regions are partially cached on the server they
+    // are currently hosted on
+    new int[][] { new int[] { 1, 2, 1 }, new int[] { 0, 50, 0, 100 }, // Region 0 is currently
+      // hosted and partially
+      // cached on
+      // server 0, but was fully
+      // cached on server 2
+      // previously
+      new int[] { 1, 100, 50, 0 }, // Region 1 is currently hosted and partially cached on
+      // server 1, but was fully cached on server 0 previously
+      new int[] { 1, 0, 50, 100 }, // Region 2 is currently hosted and partially cached on
+      // server 1, but was fully cached on server 2 previously
+      new int[] { 2, 0, 100, 50 }, // Region 3 is currently hosted and partially cached on
+    // server 2, but was fully cached on server 1 previously
+    },
+
+    // Test 4: The regions were hosted and fully cached on a server, but later moved to other
+    // server because of server crash procedure. The regions are not at all cached on the server
+    // they are currently hosted on
+    new int[][] { new int[] { 1, 1, 2 }, new int[] { 0, 0, 0, 100 }, // Region 0 is currently hosted
+      // but not cached on server
+      // 0,
+      // but was fully cached on
+      // server 2 previously
+      new int[] { 1, 100, 0, 0 }, // Region 1 is currently hosted but not cached on server 1,
+      // but was fully cached on server 0 previously
+      new int[] { 2, 0, 100, 0 }, // Region 2 is currently hosted but not cached on server 2,
+      // but was fully cached on server 1 previously
+      new int[] { 2, 100, 0, 0 }, // Region 3 is currently hosted but not cached on server 2,
+    // but was fully cached on server 1 previously
+    },
+
+    // Test 5: The regions were partially cached on old servers, before moving to the new server
+    // where also, they are partially cached
+    new int[][] { new int[] { 2, 1, 1 }, new int[] { 0, 50, 50, 0 }, // Region 0 is hosted and
+      // partially cached on
+      // server 0, but
+      // was previously hosted and
+      // partially cached on
+      // server 1
+      new int[] { 0, 50, 0, 50 }, // Region 1 is hosted and partially cached on server 0, but
+      // was previously hosted and partially cached on server 2
+      new int[] { 1, 0, 50, 50 }, // Region 2 is hosted and partially cached on server 1, but
+      // was previously hosted and partially cached on server 2
+      new int[] { 2, 0, 50, 50 }, // Region 3 is hosted and partially cached on server 2, but
+    // was previously hosted and partially cached on server 1
+    },
+
+    // Test 6: The regions are less cached on the new servers as compared to what they were
+    // cached on the server before they were moved to the new servers
+    new int[][] { new int[] { 1, 2, 1 }, new int[] { 0, 30, 70, 0 }, // Region 0 is hosted and
+      // cached 30% on server 0,
+      // but was
+      // previously hosted and
+      // cached 70% on server 1
+      new int[] { 1, 70, 30, 0 }, // Region 1 is hosted and cached 30% on server 1, but was
+      // previously hosted and cached 70% on server 0
+      new int[] { 1, 0, 30, 70 }, // Region 2 is hosted and cached 30% on server 1, but was
+      // previously hosted and cached 70% on server 2
+      new int[] { 2, 0, 70, 30 }, // Region 3 is hosted and cached 30% on server 2, but was
+    // previously hosted and cached 70% on server 1
+    },
+
+    // Test 7: The regions are more cached on the new servers as compared to what they were
+    // cached on the server before they were moved to the new servers
+    new int[][] { new int[] { 2, 1, 1 }, new int[] { 0, 80, 20, 0 }, // Region 0 is hosted and 80%
+      // cached on server 0, but
+      // was
+      // previously hosted and 20%
+      // cached on server 1
+      new int[] { 0, 80, 0, 20 }, // Region 1 is hosted and 80% cached on server 0, but was
+      // previously hosted and 20% cached on server 2
+      new int[] { 1, 20, 80, 0 }, // Region 2 is hosted and 80% cached on server 1, but was
+      // previously hosted and 20% cached on server 0
+      new int[] { 2, 0, 20, 80 }, // Region 3 is hosted and 80% cached on server 2, but was
+    // previously hosted and 20% cached on server 1
+    },
+
+    // Test 8: The regions are randomly assigned to the server with some regions historically
+    // hosted on other region servers
+    new int[][] { new int[] { 1, 2, 1 }, new int[] { 0, 34, 0, 58 }, // Region 0 is hosted and
+      // partially cached on
+      // server 0,
+      // but was previously hosted
+      // and partially cached on
+      // server 2
+      // current cache ratio <
+      // historical cache ratio
+      new int[] { 1, 78, 100, 0 }, // Region 1 is hosted and fully cached on server 1,
+      // but was previously hosted and partially cached on server 0
+      // current cache ratio > historical cache ratio
+      new int[] { 1, 66, 66, 0 }, // Region 2 is hosted and partially cached on server 1,
+      // but was previously hosted and partially cached on server 0
+      // current cache ratio == historical cache ratio
+      new int[] { 2, 0, 0, 96 }, // Region 3 is hosted and partially cached on server 0
+    // No historical cache ratio
+    }, };
+
+  private static Configuration storedConfiguration;
+
+  private CacheAwareLoadBalancer loadBalancer = new CacheAwareLoadBalancer();
+
+  @BeforeClass
+  public static void saveInitialConfiguration() {
+    storedConfiguration = new Configuration(conf);
+  }
+
+  @Before
+  public void beforeEachTest() {
+    conf = new Configuration(storedConfiguration);
+    loadBalancer.loadConf(conf);
+  }
+
+  @Test
+  public void testVerifyCacheAwareSkewnessCostFunctionEnabled() {
+    CacheAwareLoadBalancer lb = new CacheAwareLoadBalancer();
+    lb.loadConf(conf);
+    assertTrue(Arrays.asList(lb.getCostFunctionNames())
+      .contains(CacheAwareLoadBalancer.CacheAwareRegionSkewnessCostFunction.class.getSimpleName()));
+  }
+
+  @Test
+  public void testVerifyCacheAwareSkewnessCostFunctionDisabled() {
+    conf.setFloat(
+      CacheAwareLoadBalancer.CacheAwareRegionSkewnessCostFunction.REGION_COUNT_SKEW_COST_KEY, 0.0f);
+
+    CacheAwareLoadBalancer lb = new CacheAwareLoadBalancer();
+    lb.loadConf(conf);
+
+    assertFalse(Arrays.asList(lb.getCostFunctionNames())
+      .contains(CacheAwareLoadBalancer.CacheAwareRegionSkewnessCostFunction.class.getSimpleName()));
+  }
+
+  @Test
+  public void testVerifyCacheCostFunctionEnabled() {
+    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "/tmp/prefetch.persistence");
+
+    CacheAwareLoadBalancer lb = new CacheAwareLoadBalancer();
+    lb.loadConf(conf);
+
+    assertTrue(Arrays.asList(lb.getCostFunctionNames())
+      .contains(CacheAwareLoadBalancer.CacheAwareCostFunction.class.getSimpleName()));
+  }
+
+  @Test
+  public void testVerifyCacheCostFunctionDisabledByNoBucketCachePersistence() {
+    assertFalse(Arrays.asList(loadBalancer.getCostFunctionNames())
+      .contains(CacheAwareLoadBalancer.CacheAwareCostFunction.class.getSimpleName()));
+  }
+
+  @Test
+  public void testVerifyCacheCostFunctionDisabledByNoMultiplier() {
+    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "/tmp/prefetch.persistence");
+    conf.setFloat("hbase.master.balancer.stochastic.cacheCost", 0.0f);
+    assertFalse(Arrays.asList(loadBalancer.getCostFunctionNames())
+      .contains(CacheAwareLoadBalancer.CacheAwareCostFunction.class.getSimpleName()));
+  }
+
+  @Test
+  public void testCacheCost() {
+    conf.set(HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY, "/tmp/prefetch.persistence");
+    CacheAwareLoadBalancer.CacheAwareCostFunction costFunction =
+      new CacheAwareLoadBalancer.CacheAwareCostFunction(conf);
+
+    for (int test = 0; test < clusterRegionCacheRatioMocks.length; test++) {
+      int[][] clusterRegionLocations = clusterRegionCacheRatioMocks[test];
+      MockClusterForCacheCost cluster = new MockClusterForCacheCost(clusterRegionLocations);
+      costFunction.prepare(cluster);
+      double cost = costFunction.cost();
+      assertEquals(expectedCacheCost[test], cost, 0.01);
+    }
+  }
+
+  private class MockClusterForCacheCost extends BalancerClusterState {
+    private final Map<Pair<Integer, Integer>, Float> regionServerCacheRatio = new HashMap<>();
+
+    public MockClusterForCacheCost(int[][] regionsArray) {
+      // regions[0] is an array where index = serverIndex and value = number of regions
+      super(mockClusterServersUnsorted(regionsArray[0], 1), null, null, null, null);
+      Map<String, Pair<ServerName, Float>> oldCacheRatio = new HashMap<>();
+      for (int i = 1; i < regionsArray.length; i++) {
+        int regionIndex = i - 1;
+        for (int j = 1; j < regionsArray[i].length; j++) {
+          int serverIndex = j - 1;
+          float cacheRatio = (float) regionsArray[i][j] / 100;
+          regionServerCacheRatio.put(new Pair<>(regionIndex, serverIndex), cacheRatio);
+          if (cacheRatio > 0.0f && serverIndex != regionsArray[i][0]) {
+            // This is the historical cacheRatio value
+            oldCacheRatio.put(regions[regionIndex].getEncodedName(),
+              new Pair<>(servers[serverIndex], cacheRatio));
+          }
+        }
+      }
+      regionCacheRatioOnOldServerMap = oldCacheRatio;
+    }
+
+    @Override
+    public int getTotalRegionHFileSizeMB(int region) {
+      return 1;
+    }
+
+    @Override
+    protected float getRegionCacheRatioOnRegionServer(int region, int regionServerIndex) {
+      float cacheRatio = 0.0f;
+
+      // Get the cache ratio if the region is currently hosted on this server
+      if (regionServerIndex == regionIndexToServerIndex[region]) {
+        return regionServerCacheRatio.get(new Pair<>(region, regionServerIndex));
+      }
+
+      // Region is not currently hosted on this server. Check if the region was cached on this
+      // server earlier. This can happen when the server was shutdown and the cache was persisted.
+      // Search using the index name and server name and not the index id and server id as these
+      // ids may change when a server is marked as dead or a new server is added.
+      String regionEncodedName = regions[region].getEncodedName();
+      ServerName serverName = servers[regionServerIndex];
+      if (
+        regionCacheRatioOnOldServerMap != null
+          && regionCacheRatioOnOldServerMap.containsKey(regionEncodedName)
+      ) {
+        Pair<ServerName, Float> serverCacheRatio =
+          regionCacheRatioOnOldServerMap.get(regionEncodedName);
+        if (ServerName.isSameAddress(serverName, serverCacheRatio.getFirst())) {
+          cacheRatio = serverCacheRatio.getSecond();
+          regionCacheRatioOnOldServerMap.remove(regionEncodedName);
+        }
+      }
+      return cacheRatio;
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java
index 748045246b3..67ef296da58 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal.java
@@ -86,6 +86,8 @@ public class TestRSGroupBasedLoadBalancerWithStochasticLoadBalancerAsInternal
       when(rl.getWriteRequestCount()).thenReturn(0L);
       when(rl.getMemStoreSize()).thenReturn(Size.ZERO);
       when(rl.getStoreFileSize()).thenReturn(Size.ZERO);
+      when(rl.getRegionSizeMB()).thenReturn(Size.ZERO);
+      when(rl.getCurrentRegionCachedRatio()).thenReturn(0.0f);
       regionLoadMap.put(info.getRegionName(), rl);
     }
     when(serverMetrics.getRegionMetrics()).thenReturn(regionLoadMap);