You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/03/28 19:35:31 UTC
[03/48] hadoop git commit: HDFS-9579. Provide
bytes-read-by-network-distance metrics at FileSystem.Statistics level (Ming
Ma via sjlee)
HDFS-9579. Provide bytes-read-by-network-distance metrics at FileSystem.Statistics level (Ming Ma via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cd8b6889
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cd8b6889
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cd8b6889
Branch: refs/heads/HDFS-7240
Commit: cd8b6889a74a949e37f4b2eb664cdf3b59bfb93b
Parents: 33239c9
Author: Sangjin Lee <sj...@apache.org>
Authored: Sat Mar 19 14:02:04 2016 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Sat Mar 19 14:02:04 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/fs/FileSystem.java | 118 ++++++++++++++++++-
.../java/org/apache/hadoop/net/NetUtils.java | 16 ++-
.../org/apache/hadoop/net/NetworkTopology.java | 17 ++-
.../java/org/apache/hadoop/net/NodeBase.java | 18 ++-
.../org/apache/hadoop/hdfs/BlockReader.java | 10 +-
.../apache/hadoop/hdfs/BlockReaderFactory.java | 7 +-
.../apache/hadoop/hdfs/BlockReaderLocal.java | 10 +-
.../hadoop/hdfs/BlockReaderLocalLegacy.java | 10 +-
.../org/apache/hadoop/hdfs/ClientContext.java | 56 ++++++++-
.../java/org/apache/hadoop/hdfs/DFSClient.java | 11 +-
.../org/apache/hadoop/hdfs/DFSInputStream.java | 14 +--
.../hadoop/hdfs/DFSStripedInputStream.java | 3 -
.../apache/hadoop/hdfs/ExternalBlockReader.java | 10 +-
.../apache/hadoop/hdfs/RemoteBlockReader.java | 29 ++---
.../apache/hadoop/hdfs/RemoteBlockReader2.java | 29 ++---
.../org/apache/hadoop/hdfs/ReplicaAccessor.java | 7 ++
.../erasurecode/ErasureCodingWorker.java | 3 +-
.../hadoop/fs/TestEnhancedByteBufferAccess.java | 4 +-
.../hadoop/hdfs/TestBlockReaderLocal.java | 4 +-
.../org/apache/hadoop/hdfs/TestConnCache.java | 2 -
.../hadoop/hdfs/TestDistributedFileSystem.java | 62 ++++++++++
.../hadoop/hdfs/TestExternalBlockReader.java | 8 +-
.../apache/hadoop/net/TestNetworkTopology.java | 7 ++
23 files changed, 368 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index a96ea40..a8a5c6d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -3023,11 +3023,15 @@ public abstract class FileSystem extends Configured implements Closeable {
* need.
*/
public static class StatisticsData {
- volatile long bytesRead;
- volatile long bytesWritten;
- volatile int readOps;
- volatile int largeReadOps;
- volatile int writeOps;
+ private volatile long bytesRead;
+ private volatile long bytesWritten;
+ private volatile int readOps;
+ private volatile int largeReadOps;
+ private volatile int writeOps;
+ private volatile long bytesReadLocalHost;
+ private volatile long bytesReadDistanceOfOneOrTwo;
+ private volatile long bytesReadDistanceOfThreeOrFour;
+ private volatile long bytesReadDistanceOfFiveOrLarger;
/**
* Add another StatisticsData object to this one.
@@ -3038,6 +3042,12 @@ public abstract class FileSystem extends Configured implements Closeable {
this.readOps += other.readOps;
this.largeReadOps += other.largeReadOps;
this.writeOps += other.writeOps;
+ this.bytesReadLocalHost += other.bytesReadLocalHost;
+ this.bytesReadDistanceOfOneOrTwo += other.bytesReadDistanceOfOneOrTwo;
+ this.bytesReadDistanceOfThreeOrFour +=
+ other.bytesReadDistanceOfThreeOrFour;
+ this.bytesReadDistanceOfFiveOrLarger +=
+ other.bytesReadDistanceOfFiveOrLarger;
}
/**
@@ -3049,6 +3059,12 @@ public abstract class FileSystem extends Configured implements Closeable {
this.readOps = -this.readOps;
this.largeReadOps = -this.largeReadOps;
this.writeOps = -this.writeOps;
+ this.bytesReadLocalHost = -this.bytesReadLocalHost;
+ this.bytesReadDistanceOfOneOrTwo = -this.bytesReadDistanceOfOneOrTwo;
+ this.bytesReadDistanceOfThreeOrFour =
+ -this.bytesReadDistanceOfThreeOrFour;
+ this.bytesReadDistanceOfFiveOrLarger =
+ -this.bytesReadDistanceOfFiveOrLarger;
}
@Override
@@ -3077,6 +3093,22 @@ public abstract class FileSystem extends Configured implements Closeable {
public int getWriteOps() {
return writeOps;
}
+
+ public long getBytesReadLocalHost() {
+ return bytesReadLocalHost;
+ }
+
+ public long getBytesReadDistanceOfOneOrTwo() {
+ return bytesReadDistanceOfOneOrTwo;
+ }
+
+ public long getBytesReadDistanceOfThreeOrFour() {
+ return bytesReadDistanceOfThreeOrFour;
+ }
+
+ public long getBytesReadDistanceOfFiveOrLarger() {
+ return bytesReadDistanceOfFiveOrLarger;
+ }
}
private interface StatisticsAggregator<T> {
@@ -3268,6 +3300,33 @@ public abstract class FileSystem extends Configured implements Closeable {
}
/**
+ * Increment the bytes read by the network distance in the statistics
+ * In the common network topology setup, distance value should be an even
+ * number such as 0, 2, 4, 6. To make it more general, we group distance
+ * by {1, 2}, {3, 4} and {5 and beyond} for accounting.
+ * @param distance the network distance
+ * @param newBytes the additional bytes read
+ */
+ public void incrementBytesReadByDistance(int distance, long newBytes) {
+ switch (distance) {
+ case 0:
+ getThreadStatistics().bytesReadLocalHost += newBytes;
+ break;
+ case 1:
+ case 2:
+ getThreadStatistics().bytesReadDistanceOfOneOrTwo += newBytes;
+ break;
+ case 3:
+ case 4:
+ getThreadStatistics().bytesReadDistanceOfThreeOrFour += newBytes;
+ break;
+ default:
+ getThreadStatistics().bytesReadDistanceOfFiveOrLarger += newBytes;
+ break;
+ }
+ }
+
+ /**
* Apply the given aggregator to all StatisticsData objects associated with
* this Statistics object.
*
@@ -3384,6 +3443,55 @@ public abstract class FileSystem extends Configured implements Closeable {
});
}
+ /**
+ * In the common network topology setup, distance value should be an even
+ * number such as 0, 2, 4, 6. To make it more general, we group distance
+ * by {1, 2}, {3, 4} and {5 and beyond} for accounting. So if the caller
+ * ask for bytes read for distance 2, the function will return the value
+ * for group {1, 2}.
+ * @param distance the network distance
+ * @return the total number of bytes read by the network distance
+ */
+ public long getBytesReadByDistance(int distance) {
+ long bytesRead;
+ switch (distance) {
+ case 0:
+ bytesRead = getData().getBytesReadLocalHost();
+ break;
+ case 1:
+ case 2:
+ bytesRead = getData().getBytesReadDistanceOfOneOrTwo();
+ break;
+ case 3:
+ case 4:
+ bytesRead = getData().getBytesReadDistanceOfThreeOrFour();
+ break;
+ default:
+ bytesRead = getData().getBytesReadDistanceOfFiveOrLarger();
+ break;
+ }
+ return bytesRead;
+ }
+
+ /**
+ * Get all statistics data
+ * MR or other frameworks can use the method to get all statistics at once.
+ * @return the StatisticsData
+ */
+ public StatisticsData getData() {
+ return visitAll(new StatisticsAggregator<StatisticsData>() {
+ private StatisticsData all = new StatisticsData();
+
+ @Override
+ public void accept(StatisticsData data) {
+ all.add(data);
+ }
+
+ public StatisticsData aggregate() {
+ return all;
+ }
+ });
+ }
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
index e475149..2c3661a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
@@ -638,13 +638,27 @@ public class NetUtils {
/**
* Return hostname without throwing exception.
+ * The returned hostname String format is "hostname".
+ * @return hostname
+ */
+ public static String getLocalHostname() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch(UnknownHostException uhe) {
+ return "" + uhe;
+ }
+ }
+
+ /**
+ * Return hostname without throwing exception.
+ * The returned hostname String format is "hostname/ip address".
* @return hostname
*/
public static String getHostname() {
try {return "" + InetAddress.getLocalHost();}
catch(UnknownHostException uhe) {return "" + uhe;}
}
-
+
/**
* Compose a "host:port" string from the address.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
index b637da1..e1d2968 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
@@ -369,6 +369,16 @@ public class NetworkTopology {
int getNumOfLeaves() {
return numOfLeaves;
}
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object to) {
+ return super.equals(to);
+ }
} // end of InnerNode
/**
@@ -607,9 +617,14 @@ public class NetworkTopology {
* or {@link Integer#MAX_VALUE} if node1 or node2 do not belong to the cluster
*/
public int getDistance(Node node1, Node node2) {
- if (node1 == node2) {
+ if ((node1 != null && node1.equals(node2)) ||
+ (node1 == null && node2 == null)) {
return 0;
}
+ if (node1 == null || node2 == null) {
+ LOG.warn("One of the nodes is a null pointer");
+ return Integer.MAX_VALUE;
+ }
Node n1=node1, n2=node2;
int dis = 0;
netlock.readLock().lock();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java
index b136297..b465098 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java
@@ -112,7 +112,23 @@ public class NodeBase implements Node {
public static String getPath(Node node) {
return node.getNetworkLocation() + PATH_SEPARATOR_STR + node.getName();
}
-
+
+ @Override
+ public boolean equals(Object to) {
+ if (this == to) {
+ return true;
+ }
+ if (!(to instanceof NodeBase)) {
+ return false;
+ }
+ return getPath(this).equals(getPath((NodeBase)to));
+ }
+
+ @Override
+ public int hashCode() {
+ return getPath(this).hashCode();
+ }
+
/** @return this node's path as its string representation */
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
index 150cf23..63acaa7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
@@ -84,11 +84,6 @@ public interface BlockReader extends ByteBufferReadable, Closeable {
int readAll(byte[] buf, int offset, int len) throws IOException;
/**
- * @return true only if this is a local read.
- */
- boolean isLocal();
-
- /**
* @return true only if this is a short-circuit read.
* All short-circuit reads are also local.
*/
@@ -107,4 +102,9 @@ public interface BlockReader extends ByteBufferReadable, Closeable {
* @return The DataChecksum used by the read block
*/
DataChecksum getDataChecksum();
+
+ /**
+ * Return the network distance between local machine and the remote machine.
+ */
+ int getNetworkDistance();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
index 5c7bbd7..8a0050f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -833,16 +833,19 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
@SuppressWarnings("deprecation")
private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
+ int networkDistance = clientContext.getNetworkDistance(datanode);
if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
return RemoteBlockReader.newBlockReader(fileName,
block, token, startOffset, length, conf.getIoBufferSize(),
verifyChecksum, clientName, peer, datanode,
- clientContext.getPeerCache(), cachingStrategy, tracer);
+ clientContext.getPeerCache(), cachingStrategy, tracer,
+ networkDistance);
} else {
return RemoteBlockReader2.newBlockReader(
fileName, block, token, startOffset, length,
verifyChecksum, clientName, peer, datanode,
- clientContext.getPeerCache(), cachingStrategy, tracer);
+ clientContext.getPeerCache(), cachingStrategy, tracer,
+ networkDistance);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
index 859380c..68630c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
@@ -641,11 +641,6 @@ class BlockReaderLocal implements BlockReader {
}
@Override
- public boolean isLocal() {
- return true;
- }
-
- @Override
public boolean isShortCircuit() {
return true;
}
@@ -721,4 +716,9 @@ class BlockReaderLocal implements BlockReader {
public DataChecksum getDataChecksum() {
return checksum;
}
+
+ @Override
+ public int getNetworkDistance() {
+ return 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
index 7206c07..65a8373 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
@@ -723,11 +723,6 @@ class BlockReaderLocalLegacy implements BlockReader {
}
@Override
- public boolean isLocal() {
- return true;
- }
-
- @Override
public boolean isShortCircuit() {
return true;
}
@@ -741,4 +736,9 @@ class BlockReaderLocalLegacy implements BlockReader {
public DataChecksum getDataChecksum() {
return checksum;
}
+
+ @Override
+ public int getNetworkDistance() {
+ return 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
index 047645b..47d6d49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
@@ -17,16 +17,28 @@
*/
package org.apache.hadoop.hdfs;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -101,7 +113,12 @@ public class ClientContext {
*/
private boolean printedConfWarning = false;
- private ClientContext(String name, DfsClientConf conf) {
+ private final NetworkTopology topology;
+ private final NodeBase clientNode;
+ private final Map<NodeBase, Integer> nodeToDistance;
+
+ private ClientContext(String name, DfsClientConf conf,
+ Configuration config) {
final ShortCircuitConf scConf = conf.getShortCircuitConf();
this.name = name;
@@ -116,14 +133,28 @@ public class ClientContext {
this.byteArrayManager = ByteArrayManager.newInstance(
conf.getWriteByteArrayManagerConf());
+
+ DNSToSwitchMapping dnsToSwitchMapping = ReflectionUtils.newInstance(
+ config.getClass(
+ CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ ScriptBasedMapping.class, DNSToSwitchMapping.class), config);
+ List<String> nodes = new ArrayList<>();
+ String clientHostName = NetUtils.getLocalHostname();
+ nodes.add(clientHostName);
+ clientNode = new NodeBase(clientHostName,
+ dnsToSwitchMapping.resolve(nodes).get(0));
+ this.topology = NetworkTopology.getInstance(config);
+ this.topology.add(clientNode);
+ this.nodeToDistance = new ConcurrentHashMap<>();
}
- public static ClientContext get(String name, DfsClientConf conf) {
+ public static ClientContext get(String name, DfsClientConf conf,
+ Configuration config) {
ClientContext context;
synchronized(ClientContext.class) {
context = CACHES.get(name);
if (context == null) {
- context = new ClientContext(name, conf);
+ context = new ClientContext(name, conf, config);
CACHES.put(name, context);
} else {
context.printConfWarningIfNeeded(conf);
@@ -132,6 +163,10 @@ public class ClientContext {
return context;
}
+ public static ClientContext get(String name, Configuration config) {
+ return get(name, new DfsClientConf(config), config);
+ }
+
/**
* Get a client context, from a Configuration object.
*
@@ -141,8 +176,7 @@ public class ClientContext {
@VisibleForTesting
public static ClientContext getFromConf(Configuration conf) {
return get(conf.get(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT,
- HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT),
- new DfsClientConf(conf));
+ HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT), conf);
}
private void printConfWarningIfNeeded(DfsClientConf conf) {
@@ -193,4 +227,16 @@ public class ClientContext {
public ByteArrayManager getByteArrayManager() {
return byteArrayManager;
}
+
+ public int getNetworkDistance(DatanodeInfo datanodeInfo) {
+ NodeBase node = new NodeBase(datanodeInfo.getHostName(),
+ datanodeInfo.getNetworkLocation());
+ Integer distance = nodeToDistance.get(node);
+ if (distance == null) {
+ topology.add(node);
+ distance = topology.getDistance(clientNode, node);
+ nodeToDistance.put(node, distance);
+ }
+ return distance;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 0976920..3506d3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -212,7 +212,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
final String clientName;
final SocketFactory socketFactory;
final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
- final FileSystem.Statistics stats;
+ private final FileSystem.Statistics stats;
private final String authority;
private final Random r = new Random();
private SocketAddress[] localInterfaceAddrs;
@@ -357,7 +357,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
new CachingStrategy(writeDropBehind, readahead);
this.clientContext = ClientContext.get(
conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
- dfsClientConf);
+ dfsClientConf, conf);
if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) {
this.initThreadsNumForHedgedReads(dfsClientConf.
@@ -2740,6 +2740,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
}
+ void updateFileSystemReadStats(int distance, int nRead) {
+ if (stats != null) {
+ stats.incrementBytesRead(nRead);
+ stats.incrementBytesReadByDistance(distance, nRead);
+ }
+ }
+
/**
* Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if
* it does not already exist.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index d713e8f..7661e82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -775,7 +775,7 @@ public class DFSInputStream extends FSInputStream
synchronized(infoLock) {
if (blockReader.isShortCircuit()) {
readStatistics.addShortCircuitBytes(nRead);
- } else if (blockReader.isLocal()) {
+ } else if (blockReader.getNetworkDistance() == 0) {
readStatistics.addLocalBytes(nRead);
} else {
readStatistics.addRemoteBytes(nRead);
@@ -798,6 +798,8 @@ public class DFSInputStream extends FSInputStream
throws IOException {
int nRead = blockReader.read(buf, off, len);
updateReadStatistics(readStatistics, nRead, blockReader);
+ dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
+ nRead);
return nRead;
}
@@ -828,6 +830,8 @@ public class DFSInputStream extends FSInputStream
int ret = blockReader.read(buf);
success = true;
updateReadStatistics(readStatistics, ret, blockReader);
+ dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
+ ret);
if (ret == 0) {
DFSClient.LOG.warn("zero");
}
@@ -939,9 +943,6 @@ public class DFSInputStream extends FSInputStream
// got a EOS from reader though we expect more data on it.
throw new IOException("Unexpected EOS from the reader");
}
- if (dfsClient.stats != null) {
- dfsClient.stats.incrementBytesRead(result);
- }
return result;
} catch (ChecksumException ce) {
throw ce;
@@ -1194,6 +1195,8 @@ public class DFSInputStream extends FSInputStream
datanode.storageType, datanode.info);
int nread = reader.readAll(buf, offset, len);
updateReadStatistics(readStatistics, nread, reader);
+ dfsClient.updateFileSystemReadStats(
+ reader.getNetworkDistance(), nread);
if (nread != len) {
throw new IOException("truncated return from reader.read(): " +
"excpected " + len + ", got " + nread);
@@ -1479,9 +1482,6 @@ public class DFSInputStream extends FSInputStream
offset += bytesToRead;
}
assert remaining == 0 : "Wrong number of bytes read.";
- if (dfsClient.stats != null) {
- dfsClient.stats.incrementBytesRead(realLen);
- }
return realLen;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index f6547f3..38236ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -447,9 +447,6 @@ public class DFSStripedInputStream extends DFSInputStream {
result += ret;
pos += ret;
}
- if (dfsClient.stats != null) {
- dfsClient.stats.incrementBytesRead(result);
- }
return result;
} finally {
// Check if need to report block replicas corruption either read
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
index 42bec5c..707a56a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
@@ -110,11 +110,6 @@ public final class ExternalBlockReader implements BlockReader {
}
@Override
- public boolean isLocal() {
- return accessor.isLocal();
- }
-
- @Override
public boolean isShortCircuit() {
return accessor.isShortCircuit();
}
@@ -129,4 +124,9 @@ public final class ExternalBlockReader implements BlockReader {
public DataChecksum getDataChecksum() {
return null;
}
+
+ @Override
+ public int getNetworkDistance() {
+ return accessor.getNetworkDistance();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index 544e1b3..7e094f5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.apache.htrace.core.TraceScope;
@@ -93,11 +92,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
*/
private final long bytesNeededToFinish;
- /**
- * True if we are reading from a local DataNode.
- */
- private final boolean isLocal;
-
private boolean eos = false;
private boolean sentStatusCode = false;
@@ -109,6 +103,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
private final Tracer tracer;
+ private final int networkDistance;
+
/* FSInputChecker interface */
/* same interface as inputStream java.io.InputStream#read()
@@ -342,7 +338,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
private RemoteBlockReader(String file, String bpid, long blockId,
DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
- DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) {
+ DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
+ int networkDistance) {
// Path is used only for printing block and file information in debug
super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId +
":" + bpid + ":of:"+ file)/*too non path-like?*/,
@@ -351,9 +348,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
checksum.getBytesPerChecksum(),
checksum.getChecksumSize());
- this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
- createSocketAddr(datanodeID.getXferAddr()));
-
this.peer = peer;
this.datanodeID = datanodeID;
this.in = in;
@@ -375,6 +369,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
checksumSize = this.checksum.getChecksumSize();
this.peerCache = peerCache;
this.tracer = tracer;
+ this.networkDistance = networkDistance;
}
/**
@@ -400,7 +395,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
DatanodeID datanodeID,
PeerCache peerCache,
CachingStrategy cachingStrategy,
- Tracer tracer)
+ Tracer tracer, int networkDistance)
throws IOException {
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out =
@@ -436,7 +431,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
- peer, datanodeID, peerCache, tracer);
+ peer, datanodeID, peerCache, tracer, networkDistance);
}
@Override
@@ -494,11 +489,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
}
@Override
- public boolean isLocal() {
- return isLocal;
- }
-
- @Override
public boolean isShortCircuit() {
return false;
}
@@ -512,4 +502,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
public DataChecksum getDataChecksum() {
return checksum;
}
+
+ @Override
+ public int getNetworkDistance() {
+ return networkDistance;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index 22e4757..9437353 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.apache.htrace.core.TraceScope;
@@ -116,17 +115,14 @@ public class RemoteBlockReader2 implements BlockReader {
*/
private long bytesNeededToFinish;
- /**
- * True if we are reading from a local DataNode.
- */
- private final boolean isLocal;
-
private final boolean verifyChecksum;
private boolean sentStatusCode = false;
private final Tracer tracer;
+ private final int networkDistance;
+
@VisibleForTesting
public Peer getPeer() {
return peer;
@@ -280,9 +276,8 @@ public class RemoteBlockReader2 implements BlockReader {
protected RemoteBlockReader2(String file, long blockId,
DataChecksum checksum, boolean verifyChecksum,
long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
- DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) {
- this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
- createSocketAddr(datanodeID.getXferAddr()));
+ DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
+ int networkDistance) {
// Path is used only for printing block and file information in debug
this.peer = peer;
this.datanodeID = datanodeID;
@@ -302,6 +297,7 @@ public class RemoteBlockReader2 implements BlockReader {
bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize();
this.tracer = tracer;
+ this.networkDistance = networkDistance;
}
@@ -397,7 +393,8 @@ public class RemoteBlockReader2 implements BlockReader {
Peer peer, DatanodeID datanodeID,
PeerCache peerCache,
CachingStrategy cachingStrategy,
- Tracer tracer) throws IOException {
+ Tracer tracer,
+ int networkDistance) throws IOException {
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
peer.getOutputStream()));
@@ -430,7 +427,7 @@ public class RemoteBlockReader2 implements BlockReader {
return new RemoteBlockReader2(file, block.getBlockId(), checksum,
verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID,
- peerCache, tracer);
+ peerCache, tracer, networkDistance);
}
static void checkSuccess(
@@ -454,11 +451,6 @@ public class RemoteBlockReader2 implements BlockReader {
}
@Override
- public boolean isLocal() {
- return isLocal;
- }
-
- @Override
public boolean isShortCircuit() {
return false;
}
@@ -472,4 +464,9 @@ public class RemoteBlockReader2 implements BlockReader {
public DataChecksum getDataChecksum() {
return checksum;
}
+
+ @Override
+ public int getNetworkDistance() {
+ return networkDistance;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java
index e0b21e8..556c2c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java
@@ -87,4 +87,11 @@ public abstract class ReplicaAccessor {
* short-circuit byte count statistics.
*/
public abstract boolean isShortCircuit();
+
+ /**
+ * Return the network distance between local machine and the remote machine.
+ */
+ public int getNetworkDistance() {
+ return isLocal() ? 0 : Integer.MAX_VALUE;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index bde8d80..74fb3e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -849,12 +849,13 @@ public final class ErasureCodingWorker {
* read directly from DN and need to check the replica is FINALIZED
* state, notice we should not use short-circuit local read which
* requires config for domain-socket in UNIX or legacy config in Windows.
+ * The network distance value isn't used for this scenario.
*/
return RemoteBlockReader2.newBlockReader(
"dummy", block, blockToken, offsetInBlock,
block.getNumBytes() - offsetInBlock, true,
"", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo,
- null, cachingStrategy, datanode.getTracer());
+ null, cachingStrategy, datanode.getTracer(), -1);
} catch (IOException e) {
LOG.debug("Exception while creating remote block reader, datanode {}",
dnInfo, e);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
index 3455f55..a1af1fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
@@ -358,7 +358,7 @@ public class TestEnhancedByteBufferAccess {
fsIn.close();
fsIn = fs.open(TEST_PATH);
final ShortCircuitCache cache = ClientContext.get(
- CONTEXT, new DfsClientConf(conf)). getShortCircuitCache();
+ CONTEXT, conf).getShortCircuitCache();
cache.accept(new CountingVisitor(0, 5, 5, 0));
results[0] = fsIn.read(null, BLOCK_SIZE,
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
@@ -661,7 +661,7 @@ public class TestEnhancedByteBufferAccess {
final ExtendedBlock firstBlock =
DFSTestUtil.getFirstBlock(fs, TEST_PATH);
final ShortCircuitCache cache = ClientContext.get(
- CONTEXT, new DfsClientConf(conf)). getShortCircuitCache();
+ CONTEXT, conf).getShortCircuitCache();
waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
// Uncache the replica
fs.removeCacheDirective(directiveId);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
index 2d6c63a..0048d2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
import org.apache.hadoop.fs.FsTracer;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.util.Time;
@@ -736,7 +737,8 @@ public class TestBlockReaderLocal {
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
FileSystem fs = null;
try {
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster = new MiniDFSCluster.Builder(conf).
+ hosts(new String[] {NetUtils.getLocalHostname()}).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
index afa5d27..8d2398d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
@@ -99,8 +99,6 @@ public class TestConnCache {
DFSClient client = new DFSClient(
new InetSocketAddress("localhost",
util.getCluster().getNameNodePort()), util.getConf());
- ClientContext cacheContext =
- ClientContext.get(contextName, client.getConf());
DFSInputStream in = client.open(testFile.toString());
LOG.info("opened " + testFile.toString());
byte[] dataBuf = new byte[BLOCK_SIZE];
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
index 6217c45..1db0da8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum;
@@ -651,6 +652,67 @@ public class TestDistributedFileSystem {
assertEquals(largeReadOps, DFSTestUtil.getStatistics(fs).getLargeReadOps());
}
+ /** Checks read statistics. */
+ private void checkReadStatistics(FileSystem fs, int distance, long expectedReadBytes) {
+ long bytesRead = DFSTestUtil.getStatistics(fs).
+ getBytesReadByDistance(distance);
+ assertEquals(expectedReadBytes, bytesRead);
+ }
+
+ @Test
+ public void testLocalHostReadStatistics() throws Exception {
+ testReadFileSystemStatistics(0);
+ }
+
+ @Test
+ public void testLocalRackReadStatistics() throws Exception {
+ testReadFileSystemStatistics(2);
+ }
+
+ @Test
+ public void testRemoteRackOfFirstDegreeReadStatistics() throws Exception {
+ testReadFileSystemStatistics(4);
+ }
+
+ /** expectedDistance is the expected distance between client and dn.
+ * 0 means local host.
+ * 2 means same rack.
+ * 4 means remote rack of first degree.
+ */
+ private void testReadFileSystemStatistics(int expectedDistance)
+ throws IOException {
+ MiniDFSCluster cluster = null;
+ final Configuration conf = getTestConfiguration();
+
+ // create a cluster with a dn with the expected distance.
+ if (expectedDistance == 0) {
+ cluster = new MiniDFSCluster.Builder(conf).
+ hosts(new String[] {NetUtils.getLocalHostname()}).build();
+ } else if (expectedDistance == 2) {
+ cluster = new MiniDFSCluster.Builder(conf).
+ hosts(new String[] {"hostFoo"}).build();
+ } else if (expectedDistance == 4) {
+ cluster = new MiniDFSCluster.Builder(conf).
+ racks(new String[] {"/rackFoo"}).build();
+ }
+
+ // create a file, read the file and verify the metrics
+ try {
+ final FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.getStatistics(fs).reset();
+ Path dir = new Path("/test");
+ Path file = new Path(dir, "file");
+ String input = "hello world";
+ DFSTestUtil.writeFile(fs, file, input);
+ FSDataInputStream stm = fs.open(file);
+ byte[] actual = new byte[4096];
+ stm.read(actual);
+ checkReadStatistics(fs, expectedDistance, input.length());
+ } finally {
+ if (cluster != null) cluster.shutdown();
+ }
+ }
+
@Test
public void testFileChecksum() throws Exception {
final long seed = RAN.nextLong();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
index 2c36baa..5c2b6da 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -246,6 +247,11 @@ public class TestExternalBlockReader {
return true;
}
+ @Override
+ public int getNetworkDistance() {
+ return 0;
+ }
+
synchronized String getError() {
return error;
}
@@ -271,7 +277,7 @@ public class TestExternalBlockReader {
String uuid = UUID.randomUUID().toString();
conf.set(SYNTHETIC_BLOCK_READER_TEST_UUID_KEY, uuid);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(1)
+ .hosts(new String[] {NetUtils.getLocalHostname()})
.build();
final int TEST_LENGTH = 2047;
DistributedFileSystem dfs = cluster.getFileSystem();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
index 45f6cb4..736230c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
@@ -129,6 +129,13 @@ public class TestNetworkTopology {
assertEquals(cluster.getDistance(dataNodes[0], dataNodes[1]), 2);
assertEquals(cluster.getDistance(dataNodes[0], dataNodes[3]), 4);
assertEquals(cluster.getDistance(dataNodes[0], dataNodes[6]), 6);
+ // verify the distance is zero as long as two nodes have the same path.
+ // They don't need to refer to the same object.
+ NodeBase node1 = new NodeBase(dataNodes[0].getHostName(),
+ dataNodes[0].getNetworkLocation());
+ NodeBase node2 = new NodeBase(dataNodes[0].getHostName(),
+ dataNodes[0].getNetworkLocation());
+ assertEquals(0, cluster.getDistance(node1, node2));
}
@Test