You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2016/03/21 21:14:53 UTC

[40/50] [abbrv] hadoop git commit: HDFS-9579. Provide bytes-read-by-network-distance metrics at FileSystem.Statistics level (Ming Ma via sjlee)

HDFS-9579. Provide bytes-read-by-network-distance metrics at FileSystem.Statistics level (Ming Ma via sjlee)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cd8b6889
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cd8b6889
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cd8b6889

Branch: refs/heads/YARN-3368
Commit: cd8b6889a74a949e37f4b2eb664cdf3b59bfb93b
Parents: 33239c9
Author: Sangjin Lee <sj...@apache.org>
Authored: Sat Mar 19 14:02:04 2016 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Sat Mar 19 14:02:04 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/fs/FileSystem.java   | 118 ++++++++++++++++++-
 .../java/org/apache/hadoop/net/NetUtils.java    |  16 ++-
 .../org/apache/hadoop/net/NetworkTopology.java  |  17 ++-
 .../java/org/apache/hadoop/net/NodeBase.java    |  18 ++-
 .../org/apache/hadoop/hdfs/BlockReader.java     |  10 +-
 .../apache/hadoop/hdfs/BlockReaderFactory.java  |   7 +-
 .../apache/hadoop/hdfs/BlockReaderLocal.java    |  10 +-
 .../hadoop/hdfs/BlockReaderLocalLegacy.java     |  10 +-
 .../org/apache/hadoop/hdfs/ClientContext.java   |  56 ++++++++-
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |  11 +-
 .../org/apache/hadoop/hdfs/DFSInputStream.java  |  14 +--
 .../hadoop/hdfs/DFSStripedInputStream.java      |   3 -
 .../apache/hadoop/hdfs/ExternalBlockReader.java |  10 +-
 .../apache/hadoop/hdfs/RemoteBlockReader.java   |  29 ++---
 .../apache/hadoop/hdfs/RemoteBlockReader2.java  |  29 ++---
 .../org/apache/hadoop/hdfs/ReplicaAccessor.java |   7 ++
 .../erasurecode/ErasureCodingWorker.java        |   3 +-
 .../hadoop/fs/TestEnhancedByteBufferAccess.java |   4 +-
 .../hadoop/hdfs/TestBlockReaderLocal.java       |   4 +-
 .../org/apache/hadoop/hdfs/TestConnCache.java   |   2 -
 .../hadoop/hdfs/TestDistributedFileSystem.java  |  62 ++++++++++
 .../hadoop/hdfs/TestExternalBlockReader.java    |   8 +-
 .../apache/hadoop/net/TestNetworkTopology.java  |   7 ++
 23 files changed, 368 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index a96ea40..a8a5c6d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -3023,11 +3023,15 @@ public abstract class FileSystem extends Configured implements Closeable {
      * need.
      */
     public static class StatisticsData {
-      volatile long bytesRead;
-      volatile long bytesWritten;
-      volatile int readOps;
-      volatile int largeReadOps;
-      volatile int writeOps;
+      private volatile long bytesRead;
+      private volatile long bytesWritten;
+      private volatile int readOps;
+      private volatile int largeReadOps;
+      private volatile int writeOps;
+      private volatile long bytesReadLocalHost;
+      private volatile long bytesReadDistanceOfOneOrTwo;
+      private volatile long bytesReadDistanceOfThreeOrFour;
+      private volatile long bytesReadDistanceOfFiveOrLarger;
 
       /**
        * Add another StatisticsData object to this one.
@@ -3038,6 +3042,12 @@ public abstract class FileSystem extends Configured implements Closeable {
         this.readOps += other.readOps;
         this.largeReadOps += other.largeReadOps;
         this.writeOps += other.writeOps;
+        this.bytesReadLocalHost += other.bytesReadLocalHost;
+        this.bytesReadDistanceOfOneOrTwo += other.bytesReadDistanceOfOneOrTwo;
+        this.bytesReadDistanceOfThreeOrFour +=
+            other.bytesReadDistanceOfThreeOrFour;
+        this.bytesReadDistanceOfFiveOrLarger +=
+            other.bytesReadDistanceOfFiveOrLarger;
       }
 
       /**
@@ -3049,6 +3059,12 @@ public abstract class FileSystem extends Configured implements Closeable {
         this.readOps = -this.readOps;
         this.largeReadOps = -this.largeReadOps;
         this.writeOps = -this.writeOps;
+        this.bytesReadLocalHost = -this.bytesReadLocalHost;
+        this.bytesReadDistanceOfOneOrTwo = -this.bytesReadDistanceOfOneOrTwo;
+        this.bytesReadDistanceOfThreeOrFour =
+            -this.bytesReadDistanceOfThreeOrFour;
+        this.bytesReadDistanceOfFiveOrLarger =
+            -this.bytesReadDistanceOfFiveOrLarger;
       }
 
       @Override
@@ -3077,6 +3093,22 @@ public abstract class FileSystem extends Configured implements Closeable {
       public int getWriteOps() {
         return writeOps;
       }
+
+      public long getBytesReadLocalHost() {
+        return bytesReadLocalHost;
+      }
+
+      public long getBytesReadDistanceOfOneOrTwo() {
+        return bytesReadDistanceOfOneOrTwo;
+      }
+
+      public long getBytesReadDistanceOfThreeOrFour() {
+        return bytesReadDistanceOfThreeOrFour;
+      }
+
+      public long getBytesReadDistanceOfFiveOrLarger() {
+        return bytesReadDistanceOfFiveOrLarger;
+      }
     }
 
     private interface StatisticsAggregator<T> {
@@ -3268,6 +3300,33 @@ public abstract class FileSystem extends Configured implements Closeable {
     }
 
     /**
+     * Increment the bytes read by the network distance in the statistics
+     * In the common network topology setup, distance value should be an even
+     * number such as 0, 2, 4, 6. To make it more general, we group distance
+     * by {1, 2}, {3, 4} and {5 and beyond} for accounting.
+     * @param distance the network distance
+     * @param newBytes the additional bytes read
+     */
+    public void incrementBytesReadByDistance(int distance, long newBytes) {
+      switch (distance) {
+      case 0:
+        getThreadStatistics().bytesReadLocalHost += newBytes;
+        break;
+      case 1:
+      case 2:
+        getThreadStatistics().bytesReadDistanceOfOneOrTwo += newBytes;
+        break;
+      case 3:
+      case 4:
+        getThreadStatistics().bytesReadDistanceOfThreeOrFour += newBytes;
+        break;
+      default:
+        getThreadStatistics().bytesReadDistanceOfFiveOrLarger += newBytes;
+        break;
+      }
+    }
+
+    /**
      * Apply the given aggregator to all StatisticsData objects associated with
      * this Statistics object.
      *
@@ -3384,6 +3443,55 @@ public abstract class FileSystem extends Configured implements Closeable {
       });
     }
 
+    /**
+     * In the common network topology setup, distance value should be an even
+     * number such as 0, 2, 4, 6. To make it more general, we group distance
+     * by {1, 2}, {3, 4} and {5 and beyond} for accounting. So if the caller
+     * ask for bytes read for distance 2, the function will return the value
+     * for group {1, 2}.
+     * @param distance the network distance
+     * @return the total number of bytes read by the network distance
+     */
+    public long getBytesReadByDistance(int distance) {
+      long bytesRead;
+      switch (distance) {
+      case 0:
+        bytesRead = getData().getBytesReadLocalHost();
+        break;
+      case 1:
+      case 2:
+        bytesRead = getData().getBytesReadDistanceOfOneOrTwo();
+        break;
+      case 3:
+      case 4:
+        bytesRead = getData().getBytesReadDistanceOfThreeOrFour();
+        break;
+      default:
+        bytesRead = getData().getBytesReadDistanceOfFiveOrLarger();
+        break;
+      }
+      return bytesRead;
+    }
+
+    /**
+     * Get all statistics data
+     * MR or other frameworks can use the method to get all statistics at once.
+     * @return the StatisticsData
+     */
+    public StatisticsData getData() {
+      return visitAll(new StatisticsAggregator<StatisticsData>() {
+        private StatisticsData all = new StatisticsData();
+
+        @Override
+        public void accept(StatisticsData data) {
+          all.add(data);
+        }
+
+        public StatisticsData aggregate() {
+          return all;
+        }
+      });
+    }
 
     @Override
     public String toString() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
index e475149..2c3661a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
@@ -638,13 +638,27 @@ public class NetUtils {
 
   /**
    * Return hostname without throwing exception.
+   * The returned hostname String format is "hostname".
+   * @return hostname
+   */
+  public static String getLocalHostname() {
+    try {
+      return InetAddress.getLocalHost().getHostName();
+    } catch(UnknownHostException uhe) {
+      return "" + uhe;
+    }
+  }
+
+  /**
+   * Return hostname without throwing exception.
+   * The returned hostname String format is "hostname/ip address".
    * @return hostname
    */
   public static String getHostname() {
     try {return "" + InetAddress.getLocalHost();}
     catch(UnknownHostException uhe) {return "" + uhe;}
   }
-  
+
   /**
    * Compose a "host:port" string from the address.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
index b637da1..e1d2968 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
@@ -369,6 +369,16 @@ public class NetworkTopology {
     int getNumOfLeaves() {
       return numOfLeaves;
     }
+
+    @Override
+    public int hashCode() {
+      return super.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object to) {
+      return super.equals(to);
+    }
   } // end of InnerNode
 
   /**
@@ -607,9 +617,14 @@ public class NetworkTopology {
    *  or {@link Integer#MAX_VALUE} if node1 or node2 do not belong to the cluster
    */
   public int getDistance(Node node1, Node node2) {
-    if (node1 == node2) {
+    if ((node1 != null && node1.equals(node2)) ||
+        (node1 == null && node2 == null))  {
       return 0;
     }
+    if (node1 == null || node2 == null) {
+      LOG.warn("One of the nodes is a null pointer");
+      return Integer.MAX_VALUE;
+    }
     Node n1=node1, n2=node2;
     int dis = 0;
     netlock.readLock().lock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java
index b136297..b465098 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NodeBase.java
@@ -112,7 +112,23 @@ public class NodeBase implements Node {
   public static String getPath(Node node) {
     return node.getNetworkLocation() + PATH_SEPARATOR_STR + node.getName();
   }
-  
+
+  @Override
+  public boolean equals(Object to) {
+    if (this == to) {
+      return true;
+    }
+    if (!(to instanceof NodeBase)) {
+      return false;
+    }
+    return getPath(this).equals(getPath((NodeBase)to));
+  }
+
+  @Override
+  public int hashCode() {
+    return getPath(this).hashCode();
+  }
+
   /** @return this node's path as its string representation */
   @Override
   public String toString() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
index 150cf23..63acaa7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
@@ -84,11 +84,6 @@ public interface BlockReader extends ByteBufferReadable, Closeable {
   int readAll(byte[] buf, int offset, int len) throws IOException;
 
   /**
-   * @return              true only if this is a local read.
-   */
-  boolean isLocal();
-
-  /**
    * @return              true only if this is a short-circuit read.
    *                      All short-circuit reads are also local.
    */
@@ -107,4 +102,9 @@ public interface BlockReader extends ByteBufferReadable, Closeable {
    * @return              The DataChecksum used by the read block
    */
   DataChecksum getDataChecksum();
+
+  /**
+   * Return the network distance between local machine and the remote machine.
+   */
+  int getNetworkDistance();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
index 5c7bbd7..8a0050f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -833,16 +833,19 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
 
   @SuppressWarnings("deprecation")
   private BlockReader getRemoteBlockReader(Peer peer) throws IOException {
+    int networkDistance = clientContext.getNetworkDistance(datanode);
     if (conf.getShortCircuitConf().isUseLegacyBlockReader()) {
       return RemoteBlockReader.newBlockReader(fileName,
           block, token, startOffset, length, conf.getIoBufferSize(),
           verifyChecksum, clientName, peer, datanode,
-          clientContext.getPeerCache(), cachingStrategy, tracer);
+          clientContext.getPeerCache(), cachingStrategy, tracer,
+          networkDistance);
     } else {
       return RemoteBlockReader2.newBlockReader(
           fileName, block, token, startOffset, length,
           verifyChecksum, clientName, peer, datanode,
-          clientContext.getPeerCache(), cachingStrategy, tracer);
+          clientContext.getPeerCache(), cachingStrategy, tracer,
+          networkDistance);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
index 859380c..68630c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
@@ -641,11 +641,6 @@ class BlockReaderLocal implements BlockReader {
   }
 
   @Override
-  public boolean isLocal() {
-    return true;
-  }
-
-  @Override
   public boolean isShortCircuit() {
     return true;
   }
@@ -721,4 +716,9 @@ class BlockReaderLocal implements BlockReader {
   public DataChecksum getDataChecksum() {
     return checksum;
   }
+
+  @Override
+  public int getNetworkDistance() {
+    return 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
index 7206c07..65a8373 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
@@ -723,11 +723,6 @@ class BlockReaderLocalLegacy implements BlockReader {
   }
 
   @Override
-  public boolean isLocal() {
-    return true;
-  }
-
-  @Override
   public boolean isShortCircuit() {
     return true;
   }
@@ -741,4 +736,9 @@ class BlockReaderLocalLegacy implements BlockReader {
   public DataChecksum getDataChecksum() {
     return checksum;
   }
+
+  @Override
+  public int getNetworkDistance() {
+    return 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
index 047645b..47d6d49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java
@@ -17,16 +17,28 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache;
 import org.apache.hadoop.hdfs.util.ByteArrayManager;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -101,7 +113,12 @@ public class ClientContext {
    */
   private boolean printedConfWarning = false;
 
-  private ClientContext(String name, DfsClientConf conf) {
+  private final NetworkTopology topology;
+  private final NodeBase clientNode;
+  private final Map<NodeBase, Integer> nodeToDistance;
+
+  private ClientContext(String name, DfsClientConf conf,
+      Configuration config) {
     final ShortCircuitConf scConf = conf.getShortCircuitConf();
 
     this.name = name;
@@ -116,14 +133,28 @@ public class ClientContext {
 
     this.byteArrayManager = ByteArrayManager.newInstance(
         conf.getWriteByteArrayManagerConf());
+
+    DNSToSwitchMapping dnsToSwitchMapping = ReflectionUtils.newInstance(
+        config.getClass(
+            CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+            ScriptBasedMapping.class, DNSToSwitchMapping.class), config);
+    List<String> nodes = new ArrayList<>();
+    String clientHostName = NetUtils.getLocalHostname();
+    nodes.add(clientHostName);
+    clientNode = new NodeBase(clientHostName,
+        dnsToSwitchMapping.resolve(nodes).get(0));
+    this.topology = NetworkTopology.getInstance(config);
+    this.topology.add(clientNode);
+    this.nodeToDistance = new ConcurrentHashMap<>();
   }
 
-  public static ClientContext get(String name, DfsClientConf conf) {
+  public static ClientContext get(String name, DfsClientConf conf,
+      Configuration config) {
     ClientContext context;
     synchronized(ClientContext.class) {
       context = CACHES.get(name);
       if (context == null) {
-        context = new ClientContext(name, conf);
+        context = new ClientContext(name, conf, config);
         CACHES.put(name, context);
       } else {
         context.printConfWarningIfNeeded(conf);
@@ -132,6 +163,10 @@ public class ClientContext {
     return context;
   }
 
+  public static ClientContext get(String name, Configuration config) {
+    return get(name, new DfsClientConf(config), config);
+  }
+
   /**
    * Get a client context, from a Configuration object.
    *
@@ -141,8 +176,7 @@ public class ClientContext {
   @VisibleForTesting
   public static ClientContext getFromConf(Configuration conf) {
     return get(conf.get(HdfsClientConfigKeys.DFS_CLIENT_CONTEXT,
-        HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT),
-            new DfsClientConf(conf));
+        HdfsClientConfigKeys.DFS_CLIENT_CONTEXT_DEFAULT), conf);
   }
 
   private void printConfWarningIfNeeded(DfsClientConf conf) {
@@ -193,4 +227,16 @@ public class ClientContext {
   public ByteArrayManager getByteArrayManager() {
     return byteArrayManager;
   }
+
+  public int getNetworkDistance(DatanodeInfo datanodeInfo) {
+    NodeBase node = new NodeBase(datanodeInfo.getHostName(),
+        datanodeInfo.getNetworkLocation());
+    Integer distance = nodeToDistance.get(node);
+    if (distance == null) {
+      topology.add(node);
+      distance = topology.getDistance(clientNode, node);
+      nodeToDistance.put(node, distance);
+    }
+    return distance;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 0976920..3506d3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -212,7 +212,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   final String clientName;
   final SocketFactory socketFactory;
   final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
-  final FileSystem.Statistics stats;
+  private final FileSystem.Statistics stats;
   private final String authority;
   private final Random r = new Random();
   private SocketAddress[] localInterfaceAddrs;
@@ -357,7 +357,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
         new CachingStrategy(writeDropBehind, readahead);
     this.clientContext = ClientContext.get(
         conf.get(DFS_CLIENT_CONTEXT, DFS_CLIENT_CONTEXT_DEFAULT),
-        dfsClientConf);
+        dfsClientConf, conf);
 
     if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) {
       this.initThreadsNumForHedgedReads(dfsClientConf.
@@ -2740,6 +2740,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     }
   }
 
+  void updateFileSystemReadStats(int distance, int nRead) {
+    if (stats != null) {
+      stats.incrementBytesRead(nRead);
+      stats.incrementBytesReadByDistance(distance, nRead);
+    }
+  }
+
   /**
    * Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if
    * it does not already exist.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index d713e8f..7661e82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -775,7 +775,7 @@ public class DFSInputStream extends FSInputStream
     synchronized(infoLock) {
       if (blockReader.isShortCircuit()) {
         readStatistics.addShortCircuitBytes(nRead);
-      } else if (blockReader.isLocal()) {
+      } else if (blockReader.getNetworkDistance() == 0) {
         readStatistics.addLocalBytes(nRead);
       } else {
         readStatistics.addRemoteBytes(nRead);
@@ -798,6 +798,8 @@ public class DFSInputStream extends FSInputStream
         throws IOException {
       int nRead = blockReader.read(buf, off, len);
       updateReadStatistics(readStatistics, nRead, blockReader);
+      dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
+          nRead);
       return nRead;
     }
 
@@ -828,6 +830,8 @@ public class DFSInputStream extends FSInputStream
         int ret = blockReader.read(buf);
         success = true;
         updateReadStatistics(readStatistics, ret, blockReader);
+        dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(),
+            ret);
         if (ret == 0) {
           DFSClient.LOG.warn("zero");
         }
@@ -939,9 +943,6 @@ public class DFSInputStream extends FSInputStream
             // got a EOS from reader though we expect more data on it.
             throw new IOException("Unexpected EOS from the reader");
           }
-          if (dfsClient.stats != null) {
-            dfsClient.stats.incrementBytesRead(result);
-          }
           return result;
         } catch (ChecksumException ce) {
           throw ce;
@@ -1194,6 +1195,8 @@ public class DFSInputStream extends FSInputStream
             datanode.storageType, datanode.info);
         int nread = reader.readAll(buf, offset, len);
         updateReadStatistics(readStatistics, nread, reader);
+        dfsClient.updateFileSystemReadStats(
+            reader.getNetworkDistance(), nread);
         if (nread != len) {
           throw new IOException("truncated return from reader.read(): " +
               "excpected " + len + ", got " + nread);
@@ -1479,9 +1482,6 @@ public class DFSInputStream extends FSInputStream
       offset += bytesToRead;
     }
     assert remaining == 0 : "Wrong number of bytes read.";
-    if (dfsClient.stats != null) {
-      dfsClient.stats.incrementBytesRead(realLen);
-    }
     return realLen;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index f6547f3..38236ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -447,9 +447,6 @@ public class DFSStripedInputStream extends DFSInputStream {
           result += ret;
           pos += ret;
         }
-        if (dfsClient.stats != null) {
-          dfsClient.stats.incrementBytesRead(result);
-        }
         return result;
       } finally {
         // Check if need to report block replicas corruption either read

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
index 42bec5c..707a56a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java
@@ -110,11 +110,6 @@ public final class ExternalBlockReader implements BlockReader {
   }
 
   @Override
-  public boolean isLocal() {
-    return accessor.isLocal();
-  }
-
-  @Override
   public boolean isShortCircuit() {
     return accessor.isShortCircuit();
   }
@@ -129,4 +124,9 @@ public final class ExternalBlockReader implements BlockReader {
   public DataChecksum getDataChecksum() {
     return null;
   }
+
+  @Override
+  public int getNetworkDistance() {
+    return accessor.getNetworkDistance();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index 544e1b3..7e094f5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.htrace.core.TraceScope;
@@ -93,11 +92,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
    */
   private final long bytesNeededToFinish;
 
-  /**
-   * True if we are reading from a local DataNode.
-   */
-  private final boolean isLocal;
-
   private boolean eos = false;
   private boolean sentStatusCode = false;
 
@@ -109,6 +103,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
 
   private final Tracer tracer;
 
+  private final int networkDistance;
+
   /* FSInputChecker interface */
 
   /* same interface as inputStream java.io.InputStream#read()
@@ -342,7 +338,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
   private RemoteBlockReader(String file, String bpid, long blockId,
       DataInputStream in, DataChecksum checksum, boolean verifyChecksum,
       long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
-      DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) {
+      DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
+      int networkDistance) {
     // Path is used only for printing block and file information in debug
     super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId +
             ":" + bpid + ":of:"+ file)/*too non path-like?*/,
@@ -351,9 +348,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
         checksum.getBytesPerChecksum(),
         checksum.getChecksumSize());
 
-    this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
-        createSocketAddr(datanodeID.getXferAddr()));
-
     this.peer = peer;
     this.datanodeID = datanodeID;
     this.in = in;
@@ -375,6 +369,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
     checksumSize = this.checksum.getChecksumSize();
     this.peerCache = peerCache;
     this.tracer = tracer;
+    this.networkDistance = networkDistance;
   }
 
   /**
@@ -400,7 +395,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
       DatanodeID datanodeID,
       PeerCache peerCache,
       CachingStrategy cachingStrategy,
-      Tracer tracer)
+      Tracer tracer, int networkDistance)
       throws IOException {
     // in and out will be closed when sock is closed (by the caller)
     final DataOutputStream out =
@@ -436,7 +431,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
 
     return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(),
         in, checksum, verifyChecksum, startOffset, firstChunkOffset, len,
-        peer, datanodeID, peerCache, tracer);
+        peer, datanodeID, peerCache, tracer, networkDistance);
   }
 
   @Override
@@ -494,11 +489,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
   }
 
   @Override
-  public boolean isLocal() {
-    return isLocal;
-  }
-
-  @Override
   public boolean isShortCircuit() {
     return false;
   }
@@ -512,4 +502,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
   public DataChecksum getDataChecksum() {
     return checksum;
   }
+
+  @Override
+  public int getNetworkDistance() {
+    return networkDistance;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index 22e4757..9437353 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.htrace.core.TraceScope;
@@ -116,17 +115,14 @@ public class RemoteBlockReader2  implements BlockReader {
    */
   private long bytesNeededToFinish;
 
-  /**
-   * True if we are reading from a local DataNode.
-   */
-  private final boolean isLocal;
-
   private final boolean verifyChecksum;
 
   private boolean sentStatusCode = false;
 
   private final Tracer tracer;
 
+  private final int networkDistance;
+
   @VisibleForTesting
   public Peer getPeer() {
     return peer;
@@ -280,9 +276,8 @@ public class RemoteBlockReader2  implements BlockReader {
   protected RemoteBlockReader2(String file, long blockId,
       DataChecksum checksum, boolean verifyChecksum,
       long startOffset, long firstChunkOffset, long bytesToRead, Peer peer,
-      DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) {
-    this.isLocal = DFSUtilClient.isLocalAddress(NetUtils.
-        createSocketAddr(datanodeID.getXferAddr()));
+      DatanodeID datanodeID, PeerCache peerCache, Tracer tracer,
+      int networkDistance) {
     // Path is used only for printing block and file information in debug
     this.peer = peer;
     this.datanodeID = datanodeID;
@@ -302,6 +297,7 @@ public class RemoteBlockReader2  implements BlockReader {
     bytesPerChecksum = this.checksum.getBytesPerChecksum();
     checksumSize = this.checksum.getChecksumSize();
     this.tracer = tracer;
+    this.networkDistance = networkDistance;
   }
 
 
@@ -397,7 +393,8 @@ public class RemoteBlockReader2  implements BlockReader {
       Peer peer, DatanodeID datanodeID,
       PeerCache peerCache,
       CachingStrategy cachingStrategy,
-      Tracer tracer) throws IOException {
+      Tracer tracer,
+      int networkDistance) throws IOException {
     // in and out will be closed when sock is closed (by the caller)
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
         peer.getOutputStream()));
@@ -430,7 +427,7 @@ public class RemoteBlockReader2  implements BlockReader {
 
     return new RemoteBlockReader2(file, block.getBlockId(), checksum,
         verifyChecksum, startOffset, firstChunkOffset, len, peer, datanodeID,
-        peerCache, tracer);
+        peerCache, tracer, networkDistance);
   }
 
   static void checkSuccess(
@@ -454,11 +451,6 @@ public class RemoteBlockReader2  implements BlockReader {
   }
 
   @Override
-  public boolean isLocal() {
-    return isLocal;
-  }
-
-  @Override
   public boolean isShortCircuit() {
     return false;
   }
@@ -472,4 +464,9 @@ public class RemoteBlockReader2  implements BlockReader {
   public DataChecksum getDataChecksum() {
     return checksum;
   }
+
+  @Override
+  public int getNetworkDistance() {
+    return networkDistance;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java
index e0b21e8..556c2c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java
@@ -87,4 +87,11 @@ public abstract class ReplicaAccessor {
    * short-circuit byte count statistics.
    */
   public abstract boolean isShortCircuit();
+
+  /**
+   * Return the network distance between local machine and the remote machine.
+   */
+  public int getNetworkDistance() {
+    return isLocal() ? 0 : Integer.MAX_VALUE;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index bde8d80..74fb3e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -849,12 +849,13 @@ public final class ErasureCodingWorker {
          * read directly from DN and need to check the replica is FINALIZED
          * state, notice we should not use short-circuit local read which
          * requires config for domain-socket in UNIX or legacy config in Windows.
+         * The network distance value isn't used for this scenario.
          */
         return RemoteBlockReader2.newBlockReader(
             "dummy", block, blockToken, offsetInBlock, 
             block.getNumBytes() - offsetInBlock, true,
             "", newConnectedPeer(block, dnAddr, blockToken, dnInfo), dnInfo,
-            null, cachingStrategy, datanode.getTracer());
+            null, cachingStrategy, datanode.getTracer(), -1);
       } catch (IOException e) {
         LOG.debug("Exception while creating remote block reader, datanode {}",
             dnInfo, e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
index 3455f55..a1af1fc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java
@@ -358,7 +358,7 @@ public class TestEnhancedByteBufferAccess {
     fsIn.close();
     fsIn = fs.open(TEST_PATH);
     final ShortCircuitCache cache = ClientContext.get(
-        CONTEXT, new DfsClientConf(conf)). getShortCircuitCache();
+        CONTEXT, conf).getShortCircuitCache();
     cache.accept(new CountingVisitor(0, 5, 5, 0));
     results[0] = fsIn.read(null, BLOCK_SIZE,
         EnumSet.of(ReadOption.SKIP_CHECKSUMS));
@@ -661,7 +661,7 @@ public class TestEnhancedByteBufferAccess {
     final ExtendedBlock firstBlock =
         DFSTestUtil.getFirstBlock(fs, TEST_PATH);
     final ShortCircuitCache cache = ClientContext.get(
-        CONTEXT, new DfsClientConf(conf)). getShortCircuitCache();
+        CONTEXT, conf).getShortCircuitCache();
     waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
     // Uncache the replica
     fs.removeCacheDirective(directiveId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
index 2d6c63a..0048d2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
 import org.apache.hadoop.fs.FsTracer;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.net.unix.TemporarySocketDirectory;
 import org.apache.hadoop.util.Time;
@@ -736,7 +737,8 @@ public class TestBlockReaderLocal {
     byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
     FileSystem fs = null;
     try {
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      cluster = new MiniDFSCluster.Builder(conf).
+          hosts(new String[] {NetUtils.getLocalHostname()}).build();
       cluster.waitActive();
       fs = cluster.getFileSystem();
       DFSTestUtil.createFile(fs, TEST_PATH,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
index afa5d27..8d2398d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
@@ -99,8 +99,6 @@ public class TestConnCache {
     DFSClient client = new DFSClient(
         new InetSocketAddress("localhost",
             util.getCluster().getNameNodePort()), util.getConf());
-    ClientContext cacheContext =
-        ClientContext.get(contextName, client.getConf());
     DFSInputStream in = client.open(testFile.toString());
     LOG.info("opened " + testFile.toString());
     byte[] dataBuf = new byte[BLOCK_SIZE];

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
index 6217c45..1db0da8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.web.WebHdfsConstants;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.DataChecksum;
@@ -651,6 +652,67 @@ public class TestDistributedFileSystem {
     assertEquals(largeReadOps, DFSTestUtil.getStatistics(fs).getLargeReadOps());
   }
 
+  /** Checks read statistics. */
+  private void checkReadStatistics(FileSystem fs, int distance, long expectedReadBytes) {
+    long bytesRead = DFSTestUtil.getStatistics(fs).
+        getBytesReadByDistance(distance);
+    assertEquals(expectedReadBytes, bytesRead);
+  }
+
+  @Test
+  public void testLocalHostReadStatistics() throws Exception {
+    testReadFileSystemStatistics(0);
+  }
+
+  @Test
+  public void testLocalRackReadStatistics() throws Exception {
+    testReadFileSystemStatistics(2);
+  }
+
+  @Test
+  public void testRemoteRackOfFirstDegreeReadStatistics() throws Exception {
+    testReadFileSystemStatistics(4);
+  }
+
+  /** expectedDistance is the expected distance between client and dn.
+   * 0 means local host.
+   * 2 means same rack.
+   * 4 means remote rack of first degree.
+   */
+  private void testReadFileSystemStatistics(int expectedDistance)
+      throws IOException {
+    MiniDFSCluster cluster = null;
+    final Configuration conf = getTestConfiguration();
+
+    // create a cluster with a dn with the expected distance.
+    if (expectedDistance == 0) {
+      cluster = new MiniDFSCluster.Builder(conf).
+          hosts(new String[] {NetUtils.getLocalHostname()}).build();
+    } else if (expectedDistance == 2) {
+      cluster = new MiniDFSCluster.Builder(conf).
+          hosts(new String[] {"hostFoo"}).build();
+    } else if (expectedDistance == 4) {
+      cluster = new MiniDFSCluster.Builder(conf).
+          racks(new String[] {"/rackFoo"}).build();
+    }
+
+    // create a file, read the file and verify the metrics
+    try {
+      final FileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.getStatistics(fs).reset();
+      Path dir = new Path("/test");
+      Path file = new Path(dir, "file");
+      String input = "hello world";
+      DFSTestUtil.writeFile(fs, file, input);
+      FSDataInputStream stm = fs.open(file);
+      byte[] actual = new byte[4096];
+      stm.read(actual);
+      checkReadStatistics(fs, expectedDistance, input.length());
+    } finally {
+      if (cluster != null) cluster.shutdown();
+    }
+  }
+
   @Test
   public void testFileChecksum() throws Exception {
     final long seed = RAN.nextLong();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
index 2c36baa..5c2b6da 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -246,6 +247,11 @@ public class TestExternalBlockReader {
       return true;
     }
 
+    @Override
+    public int getNetworkDistance() {
+      return 0;
+    }
+
     synchronized String getError() {
       return error;
     }
@@ -271,7 +277,7 @@ public class TestExternalBlockReader {
     String uuid = UUID.randomUUID().toString();
     conf.set(SYNTHETIC_BLOCK_READER_TEST_UUID_KEY, uuid);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(1)
+        .hosts(new String[] {NetUtils.getLocalHostname()})
         .build();
     final int TEST_LENGTH = 2047;
     DistributedFileSystem dfs = cluster.getFileSystem();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cd8b6889/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
index 45f6cb4..736230c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java
@@ -129,6 +129,13 @@ public class TestNetworkTopology {
     assertEquals(cluster.getDistance(dataNodes[0], dataNodes[1]), 2);
     assertEquals(cluster.getDistance(dataNodes[0], dataNodes[3]), 4);
     assertEquals(cluster.getDistance(dataNodes[0], dataNodes[6]), 6);
+    // verify the distance is zero as long as two nodes have the same path.
+    // They don't need to refer to the same object.
+    NodeBase node1 = new NodeBase(dataNodes[0].getHostName(),
+        dataNodes[0].getNetworkLocation());
+    NodeBase node2 = new NodeBase(dataNodes[0].getHostName(),
+        dataNodes[0].getNetworkLocation());
+    assertEquals(0, cluster.getDistance(node1, node2));
   }
 
   @Test