You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/27 04:02:26 UTC

[hbase] branch master updated: HBASE-26304 Reflect out of band locality improvements in metrics and balancer (#3803)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b27124  HBASE-26304 Reflect out of band locality improvements in metrics and balancer (#3803)
1b27124 is described below

commit 1b27124c61de0b3544d6bdb1aec132b0accc00a1
Author: Bryan Beaudreault <bb...@hubspot.com>
AuthorDate: Fri Nov 26 23:01:46 2021 -0500

    HBASE-26304 Reflect out of band locality improvements in metrics and balancer (#3803)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../balancer/RegionHDFSBlockLocationFinder.java    |  68 +++++++-
 .../TestRegionHDFSBlockLocationFinder.java         |  58 +++++++
 hbase-common/src/main/resources/hbase-default.xml  |  19 +++
 .../java/org/apache/hadoop/hbase/io/FileLink.java  |  15 ++
 .../hadoop/hbase/regionserver/HStoreFile.java      |  15 +-
 .../regionserver/InputStreamBlockDistribution.java | 162 ++++++++++++++++++
 .../java/org/apache/hadoop/hbase/util/FSUtils.java |  35 ++++
 .../org/apache/hadoop/hbase/io/TestFileLink.java   |  35 ++++
 .../TestInputStreamBlockDistribution.java          | 182 +++++++++++++++++++++
 .../org/apache/hadoop/hbase/util/TestFSUtils.java  |  38 ++++-
 10 files changed, 615 insertions(+), 12 deletions(-)

diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionHDFSBlockLocationFinder.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionHDFSBlockLocationFinder.java
index 65a7a3f..9634dd1 100644
--- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionHDFSBlockLocationFinder.java
+++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionHDFSBlockLocationFinder.java
@@ -32,6 +32,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerMetrics;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -40,7 +42,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
 import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
 import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
@@ -58,6 +59,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
 class RegionHDFSBlockLocationFinder extends Configured {
   private static final Logger LOG = LoggerFactory.getLogger(RegionHDFSBlockLocationFinder.class);
   private static final long CACHE_TIME = 240 * 60 * 1000;
+  private static final float EPSILON = 0.0001f;
   private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION =
     new HDFSBlocksDistribution();
   private volatile ClusterMetrics status;
@@ -110,12 +112,70 @@ class RegionHDFSBlockLocationFinder extends Configured {
 
   void setClusterMetrics(ClusterMetrics status) {
     long currentTime = EnvironmentEdgeManager.currentTime();
-    this.status = status;
+
     if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
+      this.status = status;
       // Only count the refresh if it includes user tables ( eg more than meta and namespace ).
       lastFullRefresh = scheduleFullRefresh() ? currentTime : lastFullRefresh;
+    } else {
+      refreshLocalityChangedRegions(this.status, status);
+      this.status = status;
+    }
+  }
+
+  /**
+   * If locality for a region has changed, that pretty certainly means our cache is out of date.
+   * Compare oldStatus and newStatus, refreshing any regions which have moved or changed locality.
+   */
+  private void refreshLocalityChangedRegions(ClusterMetrics oldStatus, ClusterMetrics newStatus) {
+    if (oldStatus == null || newStatus == null) {
+      LOG.debug("Skipping locality-based refresh due to oldStatus={}, newStatus={}",
+        oldStatus, newStatus);
+      return;
+    }
+
+    Map<ServerName, ServerMetrics> oldServers = oldStatus.getLiveServerMetrics();
+    Map<ServerName, ServerMetrics> newServers = newStatus.getLiveServerMetrics();
+
+    Map<String, RegionInfo> regionsByName = new HashMap<>(cache.asMap().size());
+    for (RegionInfo regionInfo : cache.asMap().keySet()) {
+      regionsByName.put(regionInfo.getEncodedName(), regionInfo);
+    }
+
+    for (Map.Entry<ServerName, ServerMetrics> serverEntry : newServers.entrySet()) {
+      Map<byte[], RegionMetrics> newRegions = serverEntry.getValue().getRegionMetrics();
+      for (Map.Entry<byte[], RegionMetrics> regionEntry : newRegions.entrySet()) {
+        String encodedName = RegionInfo.encodeRegionName(regionEntry.getKey());
+        RegionInfo region = regionsByName.get(encodedName);
+        if (region == null) {
+          continue;
+        }
+
+        float newLocality = regionEntry.getValue().getDataLocality();
+        float oldLocality = getOldLocality(serverEntry.getKey(), regionEntry.getKey(), oldServers);
+
+        if (Math.abs(newLocality - oldLocality) > EPSILON) {
+          LOG.debug("Locality for region {} changed from {} to {}, refreshing cache",
+            region.getEncodedName(), oldLocality, newLocality);
+          cache.refresh(region);
+        }
+      }
+
+    }
+  }
+
+  private float getOldLocality(ServerName newServer, byte[] regionName,
+    Map<ServerName, ServerMetrics> oldServers) {
+    ServerMetrics serverMetrics = oldServers.get(newServer);
+    if (serverMetrics == null) {
+      return -1f;
+    }
+    RegionMetrics regionMetrics = serverMetrics.getRegionMetrics().get(regionName);
+    if (regionMetrics == null) {
+      return -1f;
     }
 
+    return regionMetrics.getDataLocality();
   }
 
   /**
@@ -159,7 +219,7 @@ class RegionHDFSBlockLocationFinder extends Configured {
         return blocksDistribution;
       }
     } catch (IOException ioe) {
-      LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = " +
+      LOG.warn("IOException during HDFSBlocksDistribution computation for region = {}",
         region.getEncodedName(), ioe);
     }
 
@@ -263,7 +323,7 @@ class RegionHDFSBlockLocationFinder extends Configured {
       } catch (InterruptedException ite) {
         Thread.currentThread().interrupt();
       } catch (ExecutionException ee) {
-        LOG.debug("ExecutionException during HDFSBlocksDistribution computation. for region = " +
+        LOG.debug("ExecutionException during HDFSBlocksDistribution computation for region = {}",
           hregionInfo.getEncodedName(), ee);
       }
       index++;
diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionHDFSBlockLocationFinder.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionHDFSBlockLocationFinder.java
index 8e129e3..a011793 100644
--- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionHDFSBlockLocationFinder.java
+++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionHDFSBlockLocationFinder.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.balancer;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
@@ -31,12 +32,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.TreeMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
+import org.apache.hadoop.hbase.RegionMetrics;
 import org.apache.hadoop.hbase.ServerMetrics;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
@@ -204,4 +207,59 @@ public class TestRegionHDFSBlockLocationFinder {
       }
     }
   }
+
+  @Test
+  public void testRefreshRegionsWithChangedLocality() {
+    ServerName testServer = ServerName.valueOf("host-0", 12345, 12345);
+    RegionInfo testRegion = REGIONS.get(0);
+
+    Map<RegionInfo, HDFSBlocksDistribution> cache = new HashMap<>();
+    for (RegionInfo region : REGIONS) {
+      HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
+      assertHostAndWeightEquals(generate(region), hbd);
+      cache.put(region, hbd);
+    }
+
+    finder.setClusterMetrics(getMetricsWithLocality(testServer, testRegion.getRegionName(),
+      0.123f));
+
+    // everything should be cached, because metrics were null before
+    for (RegionInfo region : REGIONS) {
+      HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
+      assertSame(cache.get(region), hbd);
+    }
+
+    finder.setClusterMetrics(getMetricsWithLocality(testServer, testRegion.getRegionName(),
+      0.345f));
+
+    // locality changed just for our test region, so it should no longer be the same
+    for (RegionInfo region : REGIONS) {
+      HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
+      if (region.equals(testRegion)) {
+        assertNotSame(cache.get(region), hbd);
+      } else {
+        assertSame(cache.get(region), hbd);
+      }
+    }
+  }
+
+  private ClusterMetrics getMetricsWithLocality(ServerName serverName, byte[] region,
+    float locality) {
+    RegionMetrics regionMetrics = mock(RegionMetrics.class);
+    when(regionMetrics.getDataLocality()).thenReturn(locality);
+
+    Map<byte[], RegionMetrics> regionMetricsMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    regionMetricsMap.put(region, regionMetrics);
+
+    ServerMetrics serverMetrics = mock(ServerMetrics.class);
+    when(serverMetrics.getRegionMetrics()).thenReturn(regionMetricsMap);
+
+    Map<ServerName, ServerMetrics> serverMetricsMap = new HashMap<>();
+    serverMetricsMap.put(serverName, serverMetrics);
+
+    ClusterMetrics metrics = mock(ClusterMetrics.class);
+    when(metrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
+
+    return metrics;
+  }
 }
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index ff3f816..d14792e 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -2042,4 +2042,23 @@ possible configurations would overwhelm and obscure the important.
       the ring buffer is indicated by config: hbase.master.balancer.rejection.queue.size
     </description>
   </property>
+  <property>
+    <name>hbase.locality.inputstream.derive.enabled</name>
+    <value>false</value>
+    <description>
+      If true, derive StoreFile locality metrics from the underlying DFSInputStream
+      backing reads for that StoreFile. This value will update as the DFSInputStream's
+      block locations are updated over time. Otherwise, locality is computed on StoreFile
+      open, and cached until the StoreFile is closed.
+    </description>
+  </property>
+  <property>
+    <name>hbase.locality.inputstream.derive.cache.period</name>
+    <value>60000</value>
+    <description>
+      If deriving StoreFile locality metrics from the underlying DFSInputStream, how
+      long should the derived values be cached for. The derivation process may involve
+      hitting the namenode, if the DFSInputStream's block list is incomplete.
+    </description>
+  </property>
 </configuration>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
index ba84606..ea285ed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
@@ -126,6 +126,10 @@ public class FileLink {
       this.in = tryOpen();
     }
 
+    private FSDataInputStream getUnderlyingInputStream() {
+      return in;
+    }
+
     @Override
     public int read() throws IOException {
       int res;
@@ -476,6 +480,17 @@ public class FileLink {
   }
 
   /**
+   * If the passed FSDataInputStream is backed by a FileLink, returns the underlying
+   * InputStream for the resolved link target. Otherwise, returns null.
+   */
+  public static FSDataInputStream getUnderlyingFileLinkInputStream(FSDataInputStream stream) {
+    if (stream.getWrappedStream() instanceof FileLinkInputStream) {
+      return ((FileLinkInputStream) stream.getWrappedStream()).getUnderlyingInputStream();
+    }
+    return null;
+  }
+
+  /**
    * NOTE: This method must be used only in the constructor!
    * It creates a List with the specified locations for the link.
    */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
index 7a74689..ecbc78f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
@@ -29,6 +29,7 @@ import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -127,6 +128,7 @@ public class HStoreFile implements StoreFile {
 
   // StoreFile.Reader
   private volatile StoreFileReader initialReader;
+  private volatile InputStreamBlockDistribution initialReaderBlockDistribution = null;
 
   // Block cache configuration and reference.
   private final CacheConfig cacheConf;
@@ -344,7 +346,11 @@ public class HStoreFile implements StoreFile {
    *         file is opened.
    */
   public HDFSBlocksDistribution getHDFSBlockDistribution() {
-    return this.fileInfo.getHDFSBlockDistribution();
+    if (initialReaderBlockDistribution != null) {
+      return initialReaderBlockDistribution.getHDFSBlockDistribution();
+    } else {
+      return this.fileInfo.getHDFSBlockDistribution();
+    }
   }
 
   /**
@@ -362,6 +368,13 @@ public class HStoreFile implements StoreFile {
       fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
     }
     this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
+
+    if (InputStreamBlockDistribution.isEnabled(fileInfo.getConf())) {
+      boolean useHBaseChecksum = context.getInputStreamWrapper().shouldUseHBaseChecksum();
+      FSDataInputStream stream = context.getInputStreamWrapper().getStream(useHBaseChecksum);
+      this.initialReaderBlockDistribution = new InputStreamBlockDistribution(stream, fileInfo);
+    }
+
     // Load up indices and fileinfo. This also loads Bloom filter type.
     metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo());
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InputStreamBlockDistribution.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InputStreamBlockDistribution.java
new file mode 100644
index 0000000..aa15cda
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InputStreamBlockDistribution.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.io.FileLink;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Computes the HDFSBlockDistribution for a file based on the underlying located blocks
+ * for an HdfsDataInputStream reading that file. The backing DFSInputStream.getAllBlocks involves
+ * allocating an array of numBlocks size per call. It may also involve calling the namenode, if
+ * the DFSInputStream has not fetched all the blocks yet. In order to avoid allocation pressure,
+ * we cache the computed distribution for a configurable period of time.
+ * <p>
+ * This class only gets instantiated for the <b>first</b> FSDataInputStream of each StoreFile (i.e.
+ * the one backing {@link HStoreFile#initialReader}). It's then used to dynamically update the
+ * value returned by {@link HStoreFile#getHDFSBlockDistribution()}.
+ * <p>
+ * Once the backing FSDataInputStream is closed, we should not expect the distribution result
+ * to change anymore. This is ok becuase the initialReader's InputStream is only closed when the
+ * StoreFile itself is closed, at which point nothing will be querying getHDFSBlockDistribution
+ * anymore. If/When the StoreFile is reopened, a new {@link InputStreamBlockDistribution} will
+ * be created for the new initialReader.
+ */
+@InterfaceAudience.Private
+public class InputStreamBlockDistribution {
+  private static final Logger LOG = LoggerFactory.getLogger(InputStreamBlockDistribution.class);
+
+  private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED =
+    "hbase.locality.inputstream.derive.enabled";
+  private static final boolean DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED = false;
+
+  private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD =
+    "hbase.locality.inputstream.derive.cache.period";
+  private static final int DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD = 60_000;
+
+  private final FSDataInputStream stream;
+  private final StoreFileInfo fileInfo;
+  private final int cachePeriodMs;
+
+  private HDFSBlocksDistribution hdfsBlocksDistribution;
+  private long lastCachedAt;
+  private boolean streamUnsupported;
+
+  /**
+   * This should only be called for the first FSDataInputStream of a StoreFile,
+   * in {@link HStoreFile#open()}.
+   *
+   * @see InputStreamBlockDistribution
+   * @param stream the input stream to derive locality from
+   * @param fileInfo the StoreFileInfo for the related store file
+   */
+  public InputStreamBlockDistribution(FSDataInputStream stream, StoreFileInfo fileInfo) {
+    this.stream = stream;
+    this.fileInfo = fileInfo;
+    this.cachePeriodMs = fileInfo.getConf().getInt(
+      HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD,
+      DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD);
+    this.lastCachedAt = EnvironmentEdgeManager.currentTime();
+    this.streamUnsupported = false;
+    this.hdfsBlocksDistribution = fileInfo.getHDFSBlockDistribution();
+  }
+
+  /**
+   * True if we should derive StoreFile HDFSBlockDistribution from the underlying input stream
+   */
+  public static boolean isEnabled(Configuration conf) {
+    return conf.getBoolean(HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED,
+      DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED);
+  }
+
+  /**
+   * Get the HDFSBlocksDistribution derived from the StoreFile input stream, re-computing if cache
+   * is expired.
+   */
+  public synchronized HDFSBlocksDistribution getHDFSBlockDistribution() {
+    if (EnvironmentEdgeManager.currentTime() - lastCachedAt > cachePeriodMs) {
+      try {
+        LOG.debug("Refreshing HDFSBlockDistribution for {}", fileInfo);
+        computeBlockDistribution();
+      } catch (IOException e) {
+        LOG.warn("Failed to recompute block distribution for {}. Falling back on cached value.",
+          fileInfo, e);
+      }
+    }
+    return hdfsBlocksDistribution;
+  }
+
+  private void computeBlockDistribution() throws IOException {
+    lastCachedAt = EnvironmentEdgeManager.currentTime();
+
+    FSDataInputStream stream;
+    if (fileInfo.isLink()) {
+      stream = FileLink.getUnderlyingFileLinkInputStream(this.stream);
+    } else {
+      stream = this.stream;
+    }
+
+    if (!(stream instanceof HdfsDataInputStream)) {
+      if (!streamUnsupported) {
+        LOG.warn("{} for storeFileInfo={}, isLink={}, is not an HdfsDataInputStream so cannot be "
+            + "used to derive locality. Falling back on cached value.",
+          stream, fileInfo, fileInfo.isLink());
+        streamUnsupported = true;
+      }
+      return;
+    }
+
+    streamUnsupported = false;
+    hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) stream);
+  }
+
+  /**
+   * For tests only, sets lastCachedAt so we can force a refresh
+   */
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+    allowedOnPath = ".*/src/test/.*")
+  synchronized void setLastCachedAt(long timestamp) {
+    lastCachedAt = timestamp;
+  }
+
+  /**
+   * For tests only, returns the configured cache period
+   */
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+    allowedOnPath = ".*/src/test/.*")
+  long getCachePeriodMs() {
+    return cachePeriodMs;
+  }
+
+  /**
+   * For tests only, returns whether the passed stream is supported
+   */
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+    allowedOnPath = ".*/src/test/.*")
+  boolean isStreamUnsupported() {
+    return streamUnsupported;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 1156b17..461170d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -81,6 +81,9 @@ import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.Progressable;
@@ -704,6 +707,38 @@ public final class FSUtils {
   }
 
   /**
+   * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams
+   * are backed by a series of LocatedBlocks, which are fetched periodically from the namenode.
+   * This method retrieves those blocks from the input stream and uses them to calculate
+   * HDFSBlockDistribution.
+   *
+   * The underlying method in DFSInputStream does attempt to use locally cached blocks, but
+   * may hit the namenode if the cache is determined to be incomplete. The method also involves
+   * making copies of all LocatedBlocks rather than return the underlying blocks themselves.
+   */
+  static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
+    HdfsDataInputStream inputStream) throws IOException {
+    List<LocatedBlock> blocks = inputStream.getAllBlocks();
+    HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
+    for (LocatedBlock block : blocks) {
+      String[] hosts = getHostsForLocations(block);
+      long len = block.getBlockSize();
+      StorageType[] storageTypes = block.getStorageTypes();
+      blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
+    }
+    return blocksDistribution;
+  }
+
+  private static String[] getHostsForLocations(LocatedBlock block) {
+    DatanodeInfo[] locations = block.getLocations();
+    String[] hosts = new String[locations.length];
+    for (int i = 0; i < hosts.length; i++) {
+      hosts[i] = locations[i].getHostName();
+    }
+    return hosts;
+  }
+
+  /**
    * Compute HDFS blocks distribution of a given file, or a portion of the file
    * @param fs file system
    * @param status file status of the file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java
index 1836960..6a85f98 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.ipc.RemoteException;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -89,6 +90,40 @@ public class TestFileLink {
   }
 
   /**
+   * Test that the returned link from {@link FileLink#open(FileSystem)} can be unwrapped
+   * to a {@link HdfsDataInputStream} by
+   * {@link FileLink#getUnderlyingFileLinkInputStream(FSDataInputStream)}
+   */
+  @Test
+  public void testGetUnderlyingFSDataInputStream() throws Exception {
+    HBaseTestingUtil testUtil = new HBaseTestingUtil();
+    Configuration conf = testUtil.getConfiguration();
+    conf.setInt("dfs.blocksize", 1024 * 1024);
+    conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
+
+    testUtil.startMiniDFSCluster(1);
+    try {
+      MiniDFSCluster cluster = testUtil.getDFSCluster();
+      FileSystem fs = cluster.getFileSystem();
+
+      Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
+
+      writeSomeData(fs, originalPath, 256 << 20, (byte) 2);
+
+      List<Path> files = new ArrayList<Path>();
+      files.add(originalPath);
+
+      FileLink link = new FileLink(files);
+      FSDataInputStream stream = link.open(fs);
+
+      FSDataInputStream underlying = FileLink.getUnderlyingFileLinkInputStream(stream);
+      assertTrue(underlying instanceof HdfsDataInputStream);
+    } finally {
+      testUtil.shutdownMiniCluster();
+    }
+  }
+
+  /**
    * Test, on HDFS, that the FileLink is still readable
    * even when the current file gets renamed.
    */
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestInputStreamBlockDistribution.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestInputStreamBlockDistribution.java
new file mode 100644
index 0000000..2c7872a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestInputStreamBlockDistribution.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.io.FileLink;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class})
+public class TestInputStreamBlockDistribution {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestInputStreamBlockDistribution.class);
+
+  private Configuration conf;
+  private FileSystem fs;
+  private Path testPath;
+
+  @Before
+  public void setUp() throws Exception {
+    HBaseTestingUtil testUtil = new HBaseTestingUtil();
+    conf = testUtil.getConfiguration();
+    conf.setInt("dfs.blocksize", 1024 * 1024);
+    conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
+
+    testUtil.startMiniDFSCluster(1);
+    MiniDFSCluster cluster = testUtil.getDFSCluster();
+    fs = cluster.getFileSystem();
+
+    testPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
+
+    writeSomeData(fs, testPath, 256 << 20, (byte)2);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    fs.delete(testPath, false);
+    fs.close();
+  }
+
+  @Test
+  public void itDerivesLocalityFromHFileInputStream() throws Exception {
+    try (FSDataInputStream stream = fs.open(testPath)) {
+      HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
+      InputStreamBlockDistribution test =
+        new InputStreamBlockDistribution(stream, getMockedStoreFileInfo(initial, false));
+
+      assertSame(initial, test.getHDFSBlockDistribution());
+
+      test.setLastCachedAt(test.getCachePeriodMs() + 1);
+
+      assertNotSame(initial, test.getHDFSBlockDistribution());
+    }
+
+  }
+
+  @Test
+  public void itDerivesLocalityFromFileLinkInputStream() throws Exception {
+    List<Path> files = new ArrayList<Path>();
+    files.add(testPath);
+
+    FileLink link = new FileLink(files);
+    try (FSDataInputStream stream = link.open(fs)) {
+
+      HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
+
+      InputStreamBlockDistribution test = new InputStreamBlockDistribution(stream,
+        getMockedStoreFileInfo(initial, true));
+
+      assertSame(initial, test.getHDFSBlockDistribution());
+
+      test.setLastCachedAt(test.getCachePeriodMs() + 1);
+
+      assertNotSame(initial, test.getHDFSBlockDistribution());
+    }
+  }
+
+  @Test
+  public void itFallsBackOnLastKnownValueWhenUnsupported() {
+    FSDataInputStream fakeStream = mock(FSDataInputStream.class);
+    HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
+
+    InputStreamBlockDistribution test = new InputStreamBlockDistribution(fakeStream,
+      getMockedStoreFileInfo(initial, false));
+
+    assertSame(initial, test.getHDFSBlockDistribution());
+    test.setLastCachedAt(test.getCachePeriodMs() + 1);
+
+    // fakeStream is not an HdfsDataInputStream or FileLink, so will fail to resolve
+    assertSame(initial, test.getHDFSBlockDistribution());
+    assertTrue(test.isStreamUnsupported());
+  }
+
+  @Test
+  public void itFallsBackOnLastKnownValueOnException() throws IOException {
+    HdfsDataInputStream fakeStream = mock(HdfsDataInputStream.class);
+    when(fakeStream.getAllBlocks()).thenThrow(new IOException("test"));
+
+    HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
+
+    InputStreamBlockDistribution test = new InputStreamBlockDistribution(fakeStream,
+      getMockedStoreFileInfo(initial, false));
+
+    assertSame(initial, test.getHDFSBlockDistribution());
+    test.setLastCachedAt(test.getCachePeriodMs() + 1);
+
+    // fakeStream throws an exception, so falls back on original
+    assertSame(initial, test.getHDFSBlockDistribution());
+
+    assertFalse(test.isStreamUnsupported());
+  }
+
+  /**
+   * Write up to 'size' bytes with value 'v' into a new file called 'path'.
+   */
+  private void writeSomeData(FileSystem fs, Path path, long size, byte v) throws IOException {
+    byte[] data = new byte[4096];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = v;
+    }
+
+    FSDataOutputStream stream = fs.create(path);
+    try {
+      long written = 0;
+      while (written < size) {
+        stream.write(data, 0, data.length);
+        written += data.length;
+      }
+    } finally {
+      stream.close();
+    }
+  }
+
+  private StoreFileInfo getMockedStoreFileInfo(HDFSBlocksDistribution distribution,
+    boolean isFileLink) {
+    StoreFileInfo mock = mock(StoreFileInfo.class);
+    when(mock.getHDFSBlockDistribution())
+      .thenReturn(distribution);
+    when(mock.getConf()).thenReturn(conf);
+    when(mock.isLink()).thenReturn(isFileLink);
+    return mock;
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
index 16d4456..f4a4df5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -105,7 +106,31 @@ public class TestFSUtils {
     out.close();
   }
 
-  @Test public void testcomputeHDFSBlocksDistribution() throws Exception {
+  @Test
+  public void testComputeHDFSBlocksDistributionByInputStream() throws Exception {
+    testComputeHDFSBlocksDistribution((fs, testFile) -> {
+      try (FSDataInputStream open = fs.open(testFile)) {
+        assertTrue(open instanceof HdfsDataInputStream);
+        return FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) open);
+      }
+    });
+  }
+
+  @Test
+  public void testComputeHDFSBlockDistribution() throws Exception {
+    testComputeHDFSBlocksDistribution((fs, testFile) -> {
+      FileStatus status = fs.getFileStatus(testFile);
+      return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
+    });
+  }
+
+  @FunctionalInterface
+  interface HDFSBlockDistributionFunction {
+    HDFSBlocksDistribution getForPath(FileSystem fs, Path path) throws IOException;
+  }
+
+  private void testComputeHDFSBlocksDistribution(
+    HDFSBlockDistributionFunction fileToBlockDistribution) throws Exception {
     final int DEFAULT_BLOCK_SIZE = 1024;
     conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
     MiniDFSCluster cluster = null;
@@ -129,9 +154,10 @@ public class TestFSUtils {
       boolean ok;
       do {
         ok = true;
-        FileStatus status = fs.getFileStatus(testFile);
+
         HDFSBlocksDistribution blocksDistribution =
-          FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
+          fileToBlockDistribution.getForPath(fs, testFile);
+
         long uniqueBlocksTotalWeight =
           blocksDistribution.getUniqueBlocksTotalWeight();
         for (String host : hosts) {
@@ -163,9 +189,8 @@ public class TestFSUtils {
       long weight;
       long uniqueBlocksTotalWeight;
       do {
-        FileStatus status = fs.getFileStatus(testFile);
         HDFSBlocksDistribution blocksDistribution =
-          FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
+          fileToBlockDistribution.getForPath(fs, testFile);
         uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight();
 
         String tophost = blocksDistribution.getTopHosts().get(0);
@@ -197,8 +222,7 @@ public class TestFSUtils {
       final long maxTime = EnvironmentEdgeManager.currentTime() + 2000;
       HDFSBlocksDistribution blocksDistribution;
       do {
-        FileStatus status = fs.getFileStatus(testFile);
-        blocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
+        blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile);
         // NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
       }
       while (blocksDistribution.getTopHosts().size() != 3 &&