You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/27 04:02:26 UTC
[hbase] branch master updated: HBASE-26304 Reflect out of band locality improvements in metrics and balancer (#3803)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 1b27124 HBASE-26304 Reflect out of band locality improvements in metrics and balancer (#3803)
1b27124 is described below
commit 1b27124c61de0b3544d6bdb1aec132b0accc00a1
Author: Bryan Beaudreault <bb...@hubspot.com>
AuthorDate: Fri Nov 26 23:01:46 2021 -0500
HBASE-26304 Reflect out of band locality improvements in metrics and balancer (#3803)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../balancer/RegionHDFSBlockLocationFinder.java | 68 +++++++-
.../TestRegionHDFSBlockLocationFinder.java | 58 +++++++
hbase-common/src/main/resources/hbase-default.xml | 19 +++
.../java/org/apache/hadoop/hbase/io/FileLink.java | 15 ++
.../hadoop/hbase/regionserver/HStoreFile.java | 15 +-
.../regionserver/InputStreamBlockDistribution.java | 162 ++++++++++++++++++
.../java/org/apache/hadoop/hbase/util/FSUtils.java | 35 ++++
.../org/apache/hadoop/hbase/io/TestFileLink.java | 35 ++++
.../TestInputStreamBlockDistribution.java | 182 +++++++++++++++++++++
.../org/apache/hadoop/hbase/util/TestFSUtils.java | 38 ++++-
10 files changed, 615 insertions(+), 12 deletions(-)
diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionHDFSBlockLocationFinder.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionHDFSBlockLocationFinder.java
index 65a7a3f..9634dd1 100644
--- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionHDFSBlockLocationFinder.java
+++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionHDFSBlockLocationFinder.java
@@ -32,6 +32,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.RegionMetrics;
+import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -40,7 +42,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
@@ -58,6 +59,7 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
class RegionHDFSBlockLocationFinder extends Configured {
private static final Logger LOG = LoggerFactory.getLogger(RegionHDFSBlockLocationFinder.class);
private static final long CACHE_TIME = 240 * 60 * 1000;
+ private static final float EPSILON = 0.0001f;
private static final HDFSBlocksDistribution EMPTY_BLOCK_DISTRIBUTION =
new HDFSBlocksDistribution();
private volatile ClusterMetrics status;
@@ -110,12 +112,70 @@ class RegionHDFSBlockLocationFinder extends Configured {
void setClusterMetrics(ClusterMetrics status) {
long currentTime = EnvironmentEdgeManager.currentTime();
- this.status = status;
+
if (currentTime > lastFullRefresh + (CACHE_TIME / 2)) {
+ this.status = status;
// Only count the refresh if it includes user tables ( eg more than meta and namespace ).
lastFullRefresh = scheduleFullRefresh() ? currentTime : lastFullRefresh;
+ } else {
+ refreshLocalityChangedRegions(this.status, status);
+ this.status = status;
+ }
+ }
+
+ /**
+ * If locality for a region has changed, that pretty certainly means our cache is out of date.
+ * Compare oldStatus and newStatus, refreshing any regions which have moved or changed locality.
+ */
+ private void refreshLocalityChangedRegions(ClusterMetrics oldStatus, ClusterMetrics newStatus) {
+ if (oldStatus == null || newStatus == null) {
+ LOG.debug("Skipping locality-based refresh due to oldStatus={}, newStatus={}",
+ oldStatus, newStatus);
+ return;
+ }
+
+ Map<ServerName, ServerMetrics> oldServers = oldStatus.getLiveServerMetrics();
+ Map<ServerName, ServerMetrics> newServers = newStatus.getLiveServerMetrics();
+
+ Map<String, RegionInfo> regionsByName = new HashMap<>(cache.asMap().size());
+ for (RegionInfo regionInfo : cache.asMap().keySet()) {
+ regionsByName.put(regionInfo.getEncodedName(), regionInfo);
+ }
+
+ for (Map.Entry<ServerName, ServerMetrics> serverEntry : newServers.entrySet()) {
+ Map<byte[], RegionMetrics> newRegions = serverEntry.getValue().getRegionMetrics();
+ for (Map.Entry<byte[], RegionMetrics> regionEntry : newRegions.entrySet()) {
+ String encodedName = RegionInfo.encodeRegionName(regionEntry.getKey());
+ RegionInfo region = regionsByName.get(encodedName);
+ if (region == null) {
+ continue;
+ }
+
+ float newLocality = regionEntry.getValue().getDataLocality();
+ float oldLocality = getOldLocality(serverEntry.getKey(), regionEntry.getKey(), oldServers);
+
+ if (Math.abs(newLocality - oldLocality) > EPSILON) {
+ LOG.debug("Locality for region {} changed from {} to {}, refreshing cache",
+ region.getEncodedName(), oldLocality, newLocality);
+ cache.refresh(region);
+ }
+ }
+
+ }
+ }
+
+ private float getOldLocality(ServerName newServer, byte[] regionName,
+ Map<ServerName, ServerMetrics> oldServers) {
+ ServerMetrics serverMetrics = oldServers.get(newServer);
+ if (serverMetrics == null) {
+ return -1f;
+ }
+ RegionMetrics regionMetrics = serverMetrics.getRegionMetrics().get(regionName);
+ if (regionMetrics == null) {
+ return -1f;
}
+ return regionMetrics.getDataLocality();
}
/**
@@ -159,7 +219,7 @@ class RegionHDFSBlockLocationFinder extends Configured {
return blocksDistribution;
}
} catch (IOException ioe) {
- LOG.warn("IOException during HDFSBlocksDistribution computation. for " + "region = " +
+ LOG.warn("IOException during HDFSBlocksDistribution computation for region = {}",
region.getEncodedName(), ioe);
}
@@ -263,7 +323,7 @@ class RegionHDFSBlockLocationFinder extends Configured {
} catch (InterruptedException ite) {
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
- LOG.debug("ExecutionException during HDFSBlocksDistribution computation. for region = " +
+ LOG.debug("ExecutionException during HDFSBlocksDistribution computation for region = {}",
hregionInfo.getEncodedName(), ee);
}
index++;
diff --git a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionHDFSBlockLocationFinder.java b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionHDFSBlockLocationFinder.java
index 8e129e3..a011793 100644
--- a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionHDFSBlockLocationFinder.java
+++ b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestRegionHDFSBlockLocationFinder.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.balancer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@@ -31,12 +32,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HDFSBlocksDistribution.HostAndWeight;
+import org.apache.hadoop.hbase.RegionMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -204,4 +207,59 @@ public class TestRegionHDFSBlockLocationFinder {
}
}
}
+
+ @Test
+ public void testRefreshRegionsWithChangedLocality() {
+ ServerName testServer = ServerName.valueOf("host-0", 12345, 12345);
+ RegionInfo testRegion = REGIONS.get(0);
+
+ Map<RegionInfo, HDFSBlocksDistribution> cache = new HashMap<>();
+ for (RegionInfo region : REGIONS) {
+ HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
+ assertHostAndWeightEquals(generate(region), hbd);
+ cache.put(region, hbd);
+ }
+
+ finder.setClusterMetrics(getMetricsWithLocality(testServer, testRegion.getRegionName(),
+ 0.123f));
+
+ // everything should be cached, because metrics were null before
+ for (RegionInfo region : REGIONS) {
+ HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
+ assertSame(cache.get(region), hbd);
+ }
+
+ finder.setClusterMetrics(getMetricsWithLocality(testServer, testRegion.getRegionName(),
+ 0.345f));
+
+ // locality changed just for our test region, so it should no longer be the same
+ for (RegionInfo region : REGIONS) {
+ HDFSBlocksDistribution hbd = finder.getBlockDistribution(region);
+ if (region.equals(testRegion)) {
+ assertNotSame(cache.get(region), hbd);
+ } else {
+ assertSame(cache.get(region), hbd);
+ }
+ }
+ }
+
+ private ClusterMetrics getMetricsWithLocality(ServerName serverName, byte[] region,
+ float locality) {
+ RegionMetrics regionMetrics = mock(RegionMetrics.class);
+ when(regionMetrics.getDataLocality()).thenReturn(locality);
+
+ Map<byte[], RegionMetrics> regionMetricsMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ regionMetricsMap.put(region, regionMetrics);
+
+ ServerMetrics serverMetrics = mock(ServerMetrics.class);
+ when(serverMetrics.getRegionMetrics()).thenReturn(regionMetricsMap);
+
+ Map<ServerName, ServerMetrics> serverMetricsMap = new HashMap<>();
+ serverMetricsMap.put(serverName, serverMetrics);
+
+ ClusterMetrics metrics = mock(ClusterMetrics.class);
+ when(metrics.getLiveServerMetrics()).thenReturn(serverMetricsMap);
+
+ return metrics;
+ }
}
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index ff3f816..d14792e 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -2042,4 +2042,23 @@ possible configurations would overwhelm and obscure the important.
the ring buffer is indicated by config: hbase.master.balancer.rejection.queue.size
</description>
</property>
+ <property>
+ <name>hbase.locality.inputstream.derive.enabled</name>
+ <value>false</value>
+ <description>
+ If true, derive StoreFile locality metrics from the underlying DFSInputStream
+ backing reads for that StoreFile. This value will update as the DFSInputStream's
+ block locations are updated over time. Otherwise, locality is computed on StoreFile
+ open, and cached until the StoreFile is closed.
+ </description>
+ </property>
+ <property>
+ <name>hbase.locality.inputstream.derive.cache.period</name>
+ <value>60000</value>
+ <description>
+ If deriving StoreFile locality metrics from the underlying DFSInputStream, how
+ long should the derived values be cached for. The derivation process may involve
+ hitting the namenode, if the DFSInputStream's block list is incomplete.
+ </description>
+ </property>
</configuration>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
index ba84606..ea285ed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java
@@ -126,6 +126,10 @@ public class FileLink {
this.in = tryOpen();
}
+ private FSDataInputStream getUnderlyingInputStream() {
+ return in;
+ }
+
@Override
public int read() throws IOException {
int res;
@@ -476,6 +480,17 @@ public class FileLink {
}
/**
+ * If the passed FSDataInputStream is backed by a FileLink, returns the underlying
+ * InputStream for the resolved link target. Otherwise, returns null.
+ */
+ public static FSDataInputStream getUnderlyingFileLinkInputStream(FSDataInputStream stream) {
+ if (stream.getWrappedStream() instanceof FileLinkInputStream) {
+ return ((FileLinkInputStream) stream.getWrappedStream()).getUnderlyingInputStream();
+ }
+ return null;
+ }
+
+ /**
* NOTE: This method must be used only in the constructor!
* It creates a List with the specified locations for the link.
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
index 7a74689..ecbc78f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
@@ -29,6 +29,7 @@ import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@@ -127,6 +128,7 @@ public class HStoreFile implements StoreFile {
// StoreFile.Reader
private volatile StoreFileReader initialReader;
+ private volatile InputStreamBlockDistribution initialReaderBlockDistribution = null;
// Block cache configuration and reference.
private final CacheConfig cacheConf;
@@ -344,7 +346,11 @@ public class HStoreFile implements StoreFile {
* file is opened.
*/
public HDFSBlocksDistribution getHDFSBlockDistribution() {
- return this.fileInfo.getHDFSBlockDistribution();
+ if (initialReaderBlockDistribution != null) {
+ return initialReaderBlockDistribution.getHDFSBlockDistribution();
+ } else {
+ return this.fileInfo.getHDFSBlockDistribution();
+ }
}
/**
@@ -362,6 +368,13 @@ public class HStoreFile implements StoreFile {
fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
}
this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
+
+ if (InputStreamBlockDistribution.isEnabled(fileInfo.getConf())) {
+ boolean useHBaseChecksum = context.getInputStreamWrapper().shouldUseHBaseChecksum();
+ FSDataInputStream stream = context.getInputStreamWrapper().getStream(useHBaseChecksum);
+ this.initialReaderBlockDistribution = new InputStreamBlockDistribution(stream, fileInfo);
+ }
+
// Load up indices and fileinfo. This also loads Bloom filter type.
metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InputStreamBlockDistribution.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InputStreamBlockDistribution.java
new file mode 100644
index 0000000..aa15cda
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InputStreamBlockDistribution.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.io.FileLink;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Computes the HDFSBlockDistribution for a file based on the underlying located blocks
+ * for an HdfsDataInputStream reading that file. The backing DFSInputStream.getAllBlocks involves
+ * allocating an array of numBlocks size per call. It may also involve calling the namenode, if
+ * the DFSInputStream has not fetched all the blocks yet. In order to avoid allocation pressure,
+ * we cache the computed distribution for a configurable period of time.
+ * <p>
+ * This class only gets instantiated for the <b>first</b> FSDataInputStream of each StoreFile (i.e.
+ * the one backing {@link HStoreFile#initialReader}). It's then used to dynamically update the
+ * value returned by {@link HStoreFile#getHDFSBlockDistribution()}.
+ * <p>
+ * Once the backing FSDataInputStream is closed, we should not expect the distribution result
+ * to change anymore. This is ok becuase the initialReader's InputStream is only closed when the
+ * StoreFile itself is closed, at which point nothing will be querying getHDFSBlockDistribution
+ * anymore. If/When the StoreFile is reopened, a new {@link InputStreamBlockDistribution} will
+ * be created for the new initialReader.
+ */
+@InterfaceAudience.Private
+public class InputStreamBlockDistribution {
+ private static final Logger LOG = LoggerFactory.getLogger(InputStreamBlockDistribution.class);
+
+ private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED =
+ "hbase.locality.inputstream.derive.enabled";
+ private static final boolean DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED = false;
+
+ private static final String HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD =
+ "hbase.locality.inputstream.derive.cache.period";
+ private static final int DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD = 60_000;
+
+ private final FSDataInputStream stream;
+ private final StoreFileInfo fileInfo;
+ private final int cachePeriodMs;
+
+ private HDFSBlocksDistribution hdfsBlocksDistribution;
+ private long lastCachedAt;
+ private boolean streamUnsupported;
+
+ /**
+ * This should only be called for the first FSDataInputStream of a StoreFile,
+ * in {@link HStoreFile#open()}.
+ *
+ * @see InputStreamBlockDistribution
+ * @param stream the input stream to derive locality from
+ * @param fileInfo the StoreFileInfo for the related store file
+ */
+ public InputStreamBlockDistribution(FSDataInputStream stream, StoreFileInfo fileInfo) {
+ this.stream = stream;
+ this.fileInfo = fileInfo;
+ this.cachePeriodMs = fileInfo.getConf().getInt(
+ HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD,
+ DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_CACHE_PERIOD);
+ this.lastCachedAt = EnvironmentEdgeManager.currentTime();
+ this.streamUnsupported = false;
+ this.hdfsBlocksDistribution = fileInfo.getHDFSBlockDistribution();
+ }
+
+ /**
+ * True if we should derive StoreFile HDFSBlockDistribution from the underlying input stream
+ */
+ public static boolean isEnabled(Configuration conf) {
+ return conf.getBoolean(HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED,
+ DEFAULT_HBASE_LOCALITY_INPUTSTREAM_DERIVE_ENABLED);
+ }
+
+ /**
+ * Get the HDFSBlocksDistribution derived from the StoreFile input stream, re-computing if cache
+ * is expired.
+ */
+ public synchronized HDFSBlocksDistribution getHDFSBlockDistribution() {
+ if (EnvironmentEdgeManager.currentTime() - lastCachedAt > cachePeriodMs) {
+ try {
+ LOG.debug("Refreshing HDFSBlockDistribution for {}", fileInfo);
+ computeBlockDistribution();
+ } catch (IOException e) {
+ LOG.warn("Failed to recompute block distribution for {}. Falling back on cached value.",
+ fileInfo, e);
+ }
+ }
+ return hdfsBlocksDistribution;
+ }
+
+ private void computeBlockDistribution() throws IOException {
+ lastCachedAt = EnvironmentEdgeManager.currentTime();
+
+ FSDataInputStream stream;
+ if (fileInfo.isLink()) {
+ stream = FileLink.getUnderlyingFileLinkInputStream(this.stream);
+ } else {
+ stream = this.stream;
+ }
+
+ if (!(stream instanceof HdfsDataInputStream)) {
+ if (!streamUnsupported) {
+ LOG.warn("{} for storeFileInfo={}, isLink={}, is not an HdfsDataInputStream so cannot be "
+ + "used to derive locality. Falling back on cached value.",
+ stream, fileInfo, fileInfo.isLink());
+ streamUnsupported = true;
+ }
+ return;
+ }
+
+ streamUnsupported = false;
+ hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) stream);
+ }
+
+ /**
+ * For tests only, sets lastCachedAt so we can force a refresh
+ */
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ synchronized void setLastCachedAt(long timestamp) {
+ lastCachedAt = timestamp;
+ }
+
+ /**
+ * For tests only, returns the configured cache period
+ */
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ long getCachePeriodMs() {
+ return cachePeriodMs;
+ }
+
+ /**
+ * For tests only, returns whether the passed stream is supported
+ */
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ boolean isStreamUnsupported() {
+ return streamUnsupported;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 1156b17..461170d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -81,6 +81,9 @@ import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Progressable;
@@ -704,6 +707,38 @@ public final class FSUtils {
}
/**
+ * Compute HDFS block distribution of a given HdfsDataInputStream. All HdfsDataInputStreams
+ * are backed by a series of LocatedBlocks, which are fetched periodically from the namenode.
+ * This method retrieves those blocks from the input stream and uses them to calculate
+ * HDFSBlockDistribution.
+ *
+ * The underlying method in DFSInputStream does attempt to use locally cached blocks, but
+ * may hit the namenode if the cache is determined to be incomplete. The method also involves
+ * making copies of all LocatedBlocks rather than return the underlying blocks themselves.
+ */
+ static public HDFSBlocksDistribution computeHDFSBlocksDistribution(
+ HdfsDataInputStream inputStream) throws IOException {
+ List<LocatedBlock> blocks = inputStream.getAllBlocks();
+ HDFSBlocksDistribution blocksDistribution = new HDFSBlocksDistribution();
+ for (LocatedBlock block : blocks) {
+ String[] hosts = getHostsForLocations(block);
+ long len = block.getBlockSize();
+ StorageType[] storageTypes = block.getStorageTypes();
+ blocksDistribution.addHostsAndBlockWeight(hosts, len, storageTypes);
+ }
+ return blocksDistribution;
+ }
+
+ private static String[] getHostsForLocations(LocatedBlock block) {
+ DatanodeInfo[] locations = block.getLocations();
+ String[] hosts = new String[locations.length];
+ for (int i = 0; i < hosts.length; i++) {
+ hosts[i] = locations[i].getHostName();
+ }
+ return hosts;
+ }
+
+ /**
* Compute HDFS blocks distribution of a given file, or a portion of the file
* @param fs file system
* @param status file status of the file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java
index 1836960..6a85f98 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFileLink.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.ipc.RemoteException;
import org.junit.ClassRule;
import org.junit.Test;
@@ -89,6 +90,40 @@ public class TestFileLink {
}
/**
+ * Test that the returned link from {@link FileLink#open(FileSystem)} can be unwrapped
+ * to a {@link HdfsDataInputStream} by
+ * {@link FileLink#getUnderlyingFileLinkInputStream(FSDataInputStream)}
+ */
+ @Test
+ public void testGetUnderlyingFSDataInputStream() throws Exception {
+ HBaseTestingUtil testUtil = new HBaseTestingUtil();
+ Configuration conf = testUtil.getConfiguration();
+ conf.setInt("dfs.blocksize", 1024 * 1024);
+ conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
+
+ testUtil.startMiniDFSCluster(1);
+ try {
+ MiniDFSCluster cluster = testUtil.getDFSCluster();
+ FileSystem fs = cluster.getFileSystem();
+
+ Path originalPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
+
+ writeSomeData(fs, originalPath, 256 << 20, (byte) 2);
+
+ List<Path> files = new ArrayList<Path>();
+ files.add(originalPath);
+
+ FileLink link = new FileLink(files);
+ FSDataInputStream stream = link.open(fs);
+
+ FSDataInputStream underlying = FileLink.getUnderlyingFileLinkInputStream(stream);
+ assertTrue(underlying instanceof HdfsDataInputStream);
+ } finally {
+ testUtil.shutdownMiniCluster();
+ }
+ }
+
+ /**
* Test, on HDFS, that the FileLink is still readable
* even when the current file gets renamed.
*/
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestInputStreamBlockDistribution.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestInputStreamBlockDistribution.java
new file mode 100644
index 0000000..2c7872a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestInputStreamBlockDistribution.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HDFSBlocksDistribution;
+import org.apache.hadoop.hbase.io.FileLink;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ RegionServerTests.class, MediumTests.class})
+public class TestInputStreamBlockDistribution {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestInputStreamBlockDistribution.class);
+
+ private Configuration conf;
+ private FileSystem fs;
+ private Path testPath;
+
+ @Before
+ public void setUp() throws Exception {
+ HBaseTestingUtil testUtil = new HBaseTestingUtil();
+ conf = testUtil.getConfiguration();
+ conf.setInt("dfs.blocksize", 1024 * 1024);
+ conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
+
+ testUtil.startMiniDFSCluster(1);
+ MiniDFSCluster cluster = testUtil.getDFSCluster();
+ fs = cluster.getFileSystem();
+
+ testPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
+
+ writeSomeData(fs, testPath, 256 << 20, (byte)2);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ fs.delete(testPath, false);
+ fs.close();
+ }
+
+ @Test
+ public void itDerivesLocalityFromHFileInputStream() throws Exception {
+ try (FSDataInputStream stream = fs.open(testPath)) {
+ HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
+ InputStreamBlockDistribution test =
+ new InputStreamBlockDistribution(stream, getMockedStoreFileInfo(initial, false));
+
+ assertSame(initial, test.getHDFSBlockDistribution());
+
+ test.setLastCachedAt(test.getCachePeriodMs() + 1);
+
+ assertNotSame(initial, test.getHDFSBlockDistribution());
+ }
+
+ }
+
+ @Test
+ public void itDerivesLocalityFromFileLinkInputStream() throws Exception {
+ List<Path> files = new ArrayList<Path>();
+ files.add(testPath);
+
+ FileLink link = new FileLink(files);
+ try (FSDataInputStream stream = link.open(fs)) {
+
+ HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
+
+ InputStreamBlockDistribution test = new InputStreamBlockDistribution(stream,
+ getMockedStoreFileInfo(initial, true));
+
+ assertSame(initial, test.getHDFSBlockDistribution());
+
+ test.setLastCachedAt(test.getCachePeriodMs() + 1);
+
+ assertNotSame(initial, test.getHDFSBlockDistribution());
+ }
+ }
+
+ @Test
+ public void itFallsBackOnLastKnownValueWhenUnsupported() {
+ FSDataInputStream fakeStream = mock(FSDataInputStream.class);
+ HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
+
+ InputStreamBlockDistribution test = new InputStreamBlockDistribution(fakeStream,
+ getMockedStoreFileInfo(initial, false));
+
+ assertSame(initial, test.getHDFSBlockDistribution());
+ test.setLastCachedAt(test.getCachePeriodMs() + 1);
+
+ // fakeStream is not an HdfsDataInputStream or FileLink, so will fail to resolve
+ assertSame(initial, test.getHDFSBlockDistribution());
+ assertTrue(test.isStreamUnsupported());
+ }
+
+ @Test
+ public void itFallsBackOnLastKnownValueOnException() throws IOException {
+ HdfsDataInputStream fakeStream = mock(HdfsDataInputStream.class);
+ when(fakeStream.getAllBlocks()).thenThrow(new IOException("test"));
+
+ HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
+
+ InputStreamBlockDistribution test = new InputStreamBlockDistribution(fakeStream,
+ getMockedStoreFileInfo(initial, false));
+
+ assertSame(initial, test.getHDFSBlockDistribution());
+ test.setLastCachedAt(test.getCachePeriodMs() + 1);
+
+ // fakeStream throws an exception, so falls back on original
+ assertSame(initial, test.getHDFSBlockDistribution());
+
+ assertFalse(test.isStreamUnsupported());
+ }
+
+ /**
+ * Write up to 'size' bytes with value 'v' into a new file called 'path'.
+ */
+ private void writeSomeData(FileSystem fs, Path path, long size, byte v) throws IOException {
+ byte[] data = new byte[4096];
+ for (int i = 0; i < data.length; i++) {
+ data[i] = v;
+ }
+
+ FSDataOutputStream stream = fs.create(path);
+ try {
+ long written = 0;
+ while (written < size) {
+ stream.write(data, 0, data.length);
+ written += data.length;
+ }
+ } finally {
+ stream.close();
+ }
+ }
+
+ private StoreFileInfo getMockedStoreFileInfo(HDFSBlocksDistribution distribution,
+ boolean isFileLink) {
+ StoreFileInfo mock = mock(StoreFileInfo.class);
+ when(mock.getHDFSBlockDistribution())
+ .thenReturn(distribution);
+ when(mock.getConf()).thenReturn(conf);
+ when(mock.isLink()).thenReturn(isFileLink);
+ return mock;
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
index 16d4456..f4a4df5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
@@ -105,7 +106,31 @@ public class TestFSUtils {
out.close();
}
- @Test public void testcomputeHDFSBlocksDistribution() throws Exception {
+ @Test
+ public void testComputeHDFSBlocksDistributionByInputStream() throws Exception {
+ testComputeHDFSBlocksDistribution((fs, testFile) -> {
+ try (FSDataInputStream open = fs.open(testFile)) {
+ assertTrue(open instanceof HdfsDataInputStream);
+ return FSUtils.computeHDFSBlocksDistribution((HdfsDataInputStream) open);
+ }
+ });
+ }
+
+ @Test
+ public void testComputeHDFSBlockDistribution() throws Exception {
+ testComputeHDFSBlocksDistribution((fs, testFile) -> {
+ FileStatus status = fs.getFileStatus(testFile);
+ return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
+ });
+ }
+
+ @FunctionalInterface
+ interface HDFSBlockDistributionFunction {
+ HDFSBlocksDistribution getForPath(FileSystem fs, Path path) throws IOException;
+ }
+
+ private void testComputeHDFSBlocksDistribution(
+ HDFSBlockDistributionFunction fileToBlockDistribution) throws Exception {
final int DEFAULT_BLOCK_SIZE = 1024;
conf.setLong("dfs.blocksize", DEFAULT_BLOCK_SIZE);
MiniDFSCluster cluster = null;
@@ -129,9 +154,10 @@ public class TestFSUtils {
boolean ok;
do {
ok = true;
- FileStatus status = fs.getFileStatus(testFile);
+
HDFSBlocksDistribution blocksDistribution =
- FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
+ fileToBlockDistribution.getForPath(fs, testFile);
+
long uniqueBlocksTotalWeight =
blocksDistribution.getUniqueBlocksTotalWeight();
for (String host : hosts) {
@@ -163,9 +189,8 @@ public class TestFSUtils {
long weight;
long uniqueBlocksTotalWeight;
do {
- FileStatus status = fs.getFileStatus(testFile);
HDFSBlocksDistribution blocksDistribution =
- FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
+ fileToBlockDistribution.getForPath(fs, testFile);
uniqueBlocksTotalWeight = blocksDistribution.getUniqueBlocksTotalWeight();
String tophost = blocksDistribution.getTopHosts().get(0);
@@ -197,8 +222,7 @@ public class TestFSUtils {
final long maxTime = EnvironmentEdgeManager.currentTime() + 2000;
HDFSBlocksDistribution blocksDistribution;
do {
- FileStatus status = fs.getFileStatus(testFile);
- blocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen());
+ blocksDistribution = fileToBlockDistribution.getForPath(fs, testFile);
// NameNode is informed asynchronously, so we may have a delay. See HBASE-6175
}
while (blocksDistribution.getTopHosts().size() != 3 &&