You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/10/03 22:35:51 UTC

git commit: HDFS-7055. Add tracing to DFSInputStream (cmccabe)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 34cdcaad7 -> 7f6ed7fe3


HDFS-7055. Add tracing to DFSInputStream (cmccabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7f6ed7fe
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7f6ed7fe
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7f6ed7fe

Branch: refs/heads/trunk
Commit: 7f6ed7fe365166e8075359f1d0ad035fa876c70f
Parents: 34cdcaa
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Fri Oct 3 13:28:24 2014 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Fri Oct 3 13:35:43 2014 -0700

----------------------------------------------------------------------
 .../hadoop/fs/CommonConfigurationKeys.java      |  3 +
 .../apache/hadoop/tracing/SpanReceiverHost.java | 49 +++++++++++-
 .../hadoop/tracing/TraceSamplerFactory.java     | 53 +++++++++++++
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  1 +
 .../apache/hadoop/hdfs/BlockReaderLocal.java    | 83 +++++++++++---------
 .../hadoop/hdfs/BlockReaderLocalLegacy.java     | 31 +++++---
 .../java/org/apache/hadoop/hdfs/DFSClient.java  | 30 +++++++
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 57 +++++++++++---
 .../apache/hadoop/hdfs/RemoteBlockReader.java   | 20 ++++-
 .../apache/hadoop/hdfs/RemoteBlockReader2.java  | 23 +++++-
 .../hdfs/server/datanode/BlockSender.java       | 14 ++++
 11 files changed, 297 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f6ed7fe/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 442dc7d..748c552 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -286,4 +286,7 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   public static final String NFS_EXPORTS_ALLOWED_HOSTS_SEPARATOR = ";";
   public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY = "nfs.exports.allowed.hosts";
   public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY_DEFAULT = "* rw";
+
+  public static final String  HADOOP_TRACE_SAMPLER = "hadoop.htrace.sampler";
+  public static final String  HADOOP_TRACE_SAMPLER_DEFAULT = "NeverSampler";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f6ed7fe/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/SpanReceiverHost.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/SpanReceiverHost.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/SpanReceiverHost.java
index d912504..82f099e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/SpanReceiverHost.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/SpanReceiverHost.java
@@ -17,24 +17,30 @@
  */
 package org.apache.hadoop.tracing;
 
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.tracing.SpanReceiverInfo.ConfigurationPair;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.htrace.HTraceConfiguration;
 import org.htrace.SpanReceiver;
 import org.htrace.Trace;
- 
 
 /**
  * This class provides functions for reading the names of SpanReceivers from
@@ -45,7 +51,7 @@ import org.htrace.Trace;
 @InterfaceAudience.Private
 public class SpanReceiverHost implements TraceAdminProtocol {
   public static final String SPAN_RECEIVERS_CONF_KEY =
-    "hadoop.trace.spanreceiver.classes";
+    "hadoop.htrace.spanreceiver.classes";
   private static final Log LOG = LogFactory.getLog(SpanReceiverHost.class);
   private final TreeMap<Long, SpanReceiver> receivers =
       new TreeMap<Long, SpanReceiver>();
@@ -53,6 +59,9 @@ public class SpanReceiverHost implements TraceAdminProtocol {
   private boolean closed = false;
   private long highestId = 1;
 
+  private final static String LOCAL_FILE_SPAN_RECEIVER_PATH =
+      "hadoop.htrace.local-file-span-receiver.path";
+
   private static enum SingletonHolder {
     INSTANCE;
     Object lock = new Object();
@@ -81,9 +90,32 @@ public class SpanReceiverHost implements TraceAdminProtocol {
 
   private static List<ConfigurationPair> EMPTY = Collections.emptyList();
 
+  private static String getUniqueLocalTraceFileName() {
+    String tmp = System.getProperty("java.io.tmpdir", "/tmp");
+    String nonce = null;
+    BufferedReader reader = null;
+    try {
+      // On Linux we can get a unique local file name by reading the process id
+      // out of /proc/self/stat.  (There isn't any portable way to get the
+      // process ID from Java.)
+      reader = new BufferedReader(
+          new InputStreamReader(new FileInputStream("/proc/self/stat")));
+      String line = reader.readLine();
+      nonce = line.split(" ")[0];
+    } catch (IOException e) {
+    } finally {
+      IOUtils.cleanup(LOG, reader);
+    }
+    if (nonce == null) {
+      // If we can't use the process ID, use a random nonce.
+      nonce = UUID.randomUUID().toString();
+    }
+    return new File(tmp, nonce).getAbsolutePath();
+  }
+
   /**
    * Reads the names of classes specified in the
-   * "hadoop.trace.spanreceiver.classes" property and instantiates and registers
+   * "hadoop.htrace.spanreceiver.classes" property and instantiates and registers
    * them with the Tracer as SpanReceiver's.
    *
    * The nullary constructor is called during construction, but if the classes
@@ -98,8 +130,17 @@ public class SpanReceiverHost implements TraceAdminProtocol {
     if (receiverNames == null || receiverNames.length == 0) {
       return;
     }
+    // It's convenient to have each daemon log to a random trace file when
+    // testing.
+    if (config.get(LOCAL_FILE_SPAN_RECEIVER_PATH) == null) {
+      config.set(LOCAL_FILE_SPAN_RECEIVER_PATH,
+          getUniqueLocalTraceFileName());
+    }
     for (String className : receiverNames) {
       className = className.trim();
+      if (!className.contains(".")) {
+        className = "org.htrace.impl." + className;
+      }
       try {
         SpanReceiver rcvr = loadInstance(className, EMPTY);
         Trace.addReceiver(rcvr);
@@ -145,7 +186,7 @@ public class SpanReceiverHost implements TraceAdminProtocol {
       extraMap.put(pair.getKey(), pair.getValue());
     }
     return new HTraceConfiguration() {
-      public static final String HTRACE_CONF_PREFIX = "hadoop.";
+      public static final String HTRACE_CONF_PREFIX = "hadoop.htrace.";
 
       @Override
       public String get(String key) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f6ed7fe/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/TraceSamplerFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/TraceSamplerFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/TraceSamplerFactory.java
new file mode 100644
index 0000000..0de7d3e
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/TraceSamplerFactory.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tracing;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.htrace.Sampler;
+import org.htrace.impl.ProbabilitySampler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class TraceSamplerFactory {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TraceSamplerFactory.class);
+
+  public static Sampler createSampler(Configuration conf) {
+    String samplerStr = conf.get(CommonConfigurationKeys.HADOOP_TRACE_SAMPLER,
+        CommonConfigurationKeys.HADOOP_TRACE_SAMPLER_DEFAULT);
+    if (samplerStr.equals("NeverSampler")) {
+      LOG.debug("HTrace is OFF for all spans.");
+      return Sampler.NEVER;
+    } else if (samplerStr.equals("AlwaysSampler")) {
+      LOG.info("HTrace is ON for all spans.");
+      return Sampler.ALWAYS;
+    } else if (samplerStr.equals("ProbabilitySampler")) {
+      double percentage =
+          conf.getDouble("htrace.probability.sampler.percentage", 0.01d);
+      LOG.info("HTrace is ON for " + percentage + "% of top-level spans.");
+      return new ProbabilitySampler(percentage / 100.0d);
+    } else {
+      throw new RuntimeException("Can't create sampler " + samplerStr +
+          ".  Available samplers are NeverSampler, AlwaysSampler, " +
+          "and ProbabilitySampler.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f6ed7fe/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index c9e2bd0..6308779 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -365,6 +365,7 @@ Release 2.7.0 - UNRELEASED
   NEW FEATURES
 
   IMPROVEMENTS
+    HDFS-7055. Add tracing to DFSInputStream (cmccabe)
 
   OPTIMIZATIONS
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f6ed7fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
index cd75e53..3954755 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
@@ -36,6 +36,9 @@ import org.apache.hadoop.util.DataChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.htrace.Sampler;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
 
 /**
  * BlockReaderLocal enables local short circuited reads. If the DFS client is on
@@ -304,48 +307,54 @@ class BlockReaderLocal implements BlockReader {
    */
   private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
       throws IOException {
-    int total = 0;
-    long startDataPos = dataPos;
-    int startBufPos = buf.position();
-    while (buf.hasRemaining()) {
-      int nRead = dataIn.read(buf, dataPos);
-      if (nRead < 0) {
-        break;
+    TraceScope scope = Trace.startSpan("BlockReaderLocal#fillBuffer(" +
+        block.getBlockId() + ")", Sampler.NEVER);
+    try {
+      int total = 0;
+      long startDataPos = dataPos;
+      int startBufPos = buf.position();
+      while (buf.hasRemaining()) {
+        int nRead = dataIn.read(buf, dataPos);
+        if (nRead < 0) {
+          break;
+        }
+        dataPos += nRead;
+        total += nRead;
       }
-      dataPos += nRead;
-      total += nRead;
-    }
-    if (canSkipChecksum) {
-      freeChecksumBufIfExists();
-      return total;
-    }
-    if (total > 0) {
-      try {
-        buf.limit(buf.position());
-        buf.position(startBufPos);
-        createChecksumBufIfNeeded();
-        int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
-        checksumBuf.clear();
-        checksumBuf.limit(checksumsNeeded * checksumSize);
-        long checksumPos =
-          7 + ((startDataPos / bytesPerChecksum) * checksumSize);
-        while (checksumBuf.hasRemaining()) {
-          int nRead = checksumIn.read(checksumBuf, checksumPos);
-          if (nRead < 0) {
-            throw new IOException("Got unexpected checksum file EOF at " +
-                checksumPos + ", block file position " + startDataPos + " for " +
-                "block " + block + " of file " + filename);
+      if (canSkipChecksum) {
+        freeChecksumBufIfExists();
+        return total;
+      }
+      if (total > 0) {
+        try {
+          buf.limit(buf.position());
+          buf.position(startBufPos);
+          createChecksumBufIfNeeded();
+          int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
+          checksumBuf.clear();
+          checksumBuf.limit(checksumsNeeded * checksumSize);
+          long checksumPos =
+              7 + ((startDataPos / bytesPerChecksum) * checksumSize);
+          while (checksumBuf.hasRemaining()) {
+            int nRead = checksumIn.read(checksumBuf, checksumPos);
+            if (nRead < 0) {
+              throw new IOException("Got unexpected checksum file EOF at " +
+                  checksumPos + ", block file position " + startDataPos + " for " +
+                  "block " + block + " of file " + filename);
+            }
+            checksumPos += nRead;
           }
-          checksumPos += nRead;
+          checksumBuf.flip();
+
+          checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
+        } finally {
+          buf.position(buf.limit());
         }
-        checksumBuf.flip();
-  
-        checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
-      } finally {
-        buf.position(buf.limit());
       }
+      return total;
+    } finally {
+      scope.close();
     }
-    return total;
   }
 
   private boolean createNoChecksumContext() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f6ed7fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
index 4745575..d42b860 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
@@ -46,6 +46,9 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
+import org.htrace.Sampler;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
 
 /**
  * BlockReaderLocalLegacy enables local short circuited reads. If the DFS client is on
@@ -169,6 +172,7 @@ class BlockReaderLocalLegacy implements BlockReader {
   /** offset in block where reader wants to actually read */
   private long startOffset;
   private final String filename;
+  private long blockId;
   
   /**
    * The only way this object can be instantiated.
@@ -320,6 +324,7 @@ class BlockReaderLocalLegacy implements BlockReader {
     this.checksum = checksum;
     this.verifyChecksum = verifyChecksum;
     this.startOffset = Math.max(startOffset, 0);
+    this.blockId = block.getBlockId();
 
     bytesPerChecksum = this.checksum.getBytesPerChecksum();
     checksumSize = this.checksum.getChecksumSize();
@@ -357,20 +362,26 @@ class BlockReaderLocalLegacy implements BlockReader {
    */
   private int fillBuffer(FileInputStream stream, ByteBuffer buf)
       throws IOException {
-    int bytesRead = stream.getChannel().read(buf);
-    if (bytesRead < 0) {
-      //EOF
-      return bytesRead;
-    }
-    while (buf.remaining() > 0) {
-      int n = stream.getChannel().read(buf);
-      if (n < 0) {
+    TraceScope scope = Trace.startSpan("BlockReaderLocalLegacy#fillBuffer(" +
+        blockId + ")", Sampler.NEVER);
+    try {
+      int bytesRead = stream.getChannel().read(buf);
+      if (bytesRead < 0) {
         //EOF
         return bytesRead;
       }
-      bytesRead += n;
+      while (buf.remaining() > 0) {
+        int n = stream.getChannel().read(buf);
+        if (n < 0) {
+          //EOF
+          return bytesRead;
+        }
+        bytesRead += n;
+      }
+      return bytesRead;
+    } finally {
+      scope.close();
     }
-    return bytesRead;
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f6ed7fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index c975ad5..86faf18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -72,12 +72,14 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.lang.reflect.Proxy;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
+import java.nio.charset.Charset;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -200,6 +202,7 @@ import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.RpcInvocationHandler;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
@@ -207,6 +210,8 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenRenewer;
+import org.apache.hadoop.tracing.SpanReceiverHost;
+import org.apache.hadoop.tracing.TraceSamplerFactory;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum.Type;
@@ -218,6 +223,11 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.net.InetAddresses;
+import org.htrace.Sampler;
+import org.htrace.Span;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
+import org.htrace.impl.ProbabilitySampler;
 
 /********************************************************
  * DFSClient can connect to a Hadoop Filesystem and 
@@ -266,6 +276,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
   @VisibleForTesting
   KeyProvider provider;
+  private final SpanReceiverHost spanReceiverHost;
+  private final Sampler traceSampler;
+
   /**
    * DFSClient configuration 
    */
@@ -582,6 +595,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
       Configuration conf, FileSystem.Statistics stats)
     throws IOException {
+    spanReceiverHost = SpanReceiverHost.getInstance(conf);
+    traceSampler = TraceSamplerFactory.createSampler(conf);
     // Copy only the required DFSClient configuration
     this.dfsClientConf = new Conf(conf);
     if (this.dfsClientConf.useLegacyBlockReaderLocal) {
@@ -3158,4 +3173,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   public SaslDataTransferClient getSaslDataTransferClient() {
     return saslClient;
   }
+
+  private static final byte[] PATH =
+      new String("path").getBytes(Charset.forName("UTF-8"));
+
+  TraceScope getPathTraceScope(String description, String path) {
+    TraceScope scope = Trace.startSpan(description, traceSampler);
+    Span span = scope.getSpan();
+    if (span != null) {
+      if (path != null) {
+        span.addKVAnnotation(PATH,
+            path.getBytes(Charset.forName("UTF-8")));
+      }
+    }
+    return scope;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f6ed7fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index af1ba14..e8bcfcc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -74,6 +74,9 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.IdentityHashStore;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.htrace.Span;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
 
 /****************************************************************
  * DFSInputStream provides bytes from a named file.  It handles 
@@ -840,15 +843,25 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   @Override
   public synchronized int read(final byte buf[], int off, int len) throws IOException {
     ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);
-
-    return readWithStrategy(byteArrayReader, off, len);
+    TraceScope scope =
+        dfsClient.getPathTraceScope("DFSInputStream#byteArrayRead", src);
+    try {
+      return readWithStrategy(byteArrayReader, off, len);
+    } finally {
+      scope.close();
+    }
   }
 
   @Override
   public synchronized int read(final ByteBuffer buf) throws IOException {
     ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf);
-
-    return readWithStrategy(byteBufferReader, 0, buf.remaining());
+    TraceScope scope =
+        dfsClient.getPathTraceScope("DFSInputStream#byteBufferRead", src);
+    try {
+      return readWithStrategy(byteBufferReader, 0, buf.remaining());
+    } finally {
+      scope.close();
+    }
   }
 
 
@@ -984,15 +997,23 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
       final LocatedBlock block, final long start, final long end,
       final ByteBuffer bb,
-      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
+      final int hedgedReadId) {
+    final Span parentSpan = Trace.currentSpan();
     return new Callable<ByteBuffer>() {
       @Override
       public ByteBuffer call() throws Exception {
         byte[] buf = bb.array();
         int offset = bb.position();
-        actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
-            corruptedBlockMap);
-        return bb;
+        TraceScope scope =
+            Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
+        try {
+          actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
+              corruptedBlockMap);
+          return bb;
+        } finally {
+          scope.close();
+        }
       }
     };
   }
@@ -1108,6 +1129,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>();
     ByteBuffer bb = null;
     int len = (int) (end - start + 1);
+    int hedgedReadId = 0;
     block = getBlockAt(block.getStartOffset(), false);
     while (true) {
       // see HDFS-6591, this metric is used to verify/catch unnecessary loops
@@ -1120,7 +1142,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         chosenNode = chooseDataNode(block, ignored);
         bb = ByteBuffer.wrap(buf, offset, len);
         Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
-            chosenNode, block, start, end, bb, corruptedBlockMap);
+            chosenNode, block, start, end, bb, corruptedBlockMap,
+            hedgedReadId++);
         Future<ByteBuffer> firstRequest = hedgedService
             .submit(getFromDataNodeCallable);
         futures.add(firstRequest);
@@ -1157,7 +1180,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           }
           bb = ByteBuffer.allocate(len);
           Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
-              chosenNode, block, start, end, bb, corruptedBlockMap);
+              chosenNode, block, start, end, bb, corruptedBlockMap,
+              hedgedReadId++);
           Future<ByteBuffer> oneMoreRequest = hedgedService
               .submit(getFromDataNodeCallable);
           futures.add(oneMoreRequest);
@@ -1272,7 +1296,18 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    */
   @Override
   public int read(long position, byte[] buffer, int offset, int length)
-    throws IOException {
+      throws IOException {
+    TraceScope scope =
+        dfsClient.getPathTraceScope("DFSInputStream#byteArrayPread", src);
+    try {
+      return pread(position, buffer, offset, length);
+    } finally {
+      scope.close();
+    }
+  }
+
+  private int pread(long position, byte[] buffer, int offset, int length)
+      throws IOException {
     // sanity checks
     dfsClient.checkOpen();
     if (closed) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f6ed7fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index 25a7287..f2d3395 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -46,6 +46,9 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
+import org.htrace.Sampler;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
 
 
 /**
@@ -69,6 +72,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
   /** offset in block where reader wants to actually read */
   private long startOffset;
 
+  private final long blockId;
+
   /** offset in block of of first chunk - may be less than startOffset
       if startOffset is not chunk-aligned */
   private final long firstChunkOffset;
@@ -208,6 +213,19 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
   protected synchronized int readChunk(long pos, byte[] buf, int offset, 
                                        int len, byte[] checksumBuf) 
                                        throws IOException {
+    TraceScope scope =
+        Trace.startSpan("RemoteBlockReader#readChunk(" + blockId + ")",
+            Sampler.NEVER);
+    try {
+      return readChunkImpl(pos, buf, offset, len, checksumBuf);
+    } finally {
+      scope.close();
+    }
+  }
+
+  private synchronized int readChunkImpl(long pos, byte[] buf, int offset,
+                                     int len, byte[] checksumBuf)
+                                     throws IOException {
     // Read one chunk.
     if (eos) {
       // Already hit EOF
@@ -347,6 +365,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
     this.in = in;
     this.checksum = checksum;
     this.startOffset = Math.max( startOffset, 0 );
+    this.blockId = blockId;
 
     // The total number of bytes that we need to transfer from the DN is
     // the amount that the user wants (bytesToRead), plus the padding at
@@ -367,7 +386,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
    * Create a new BlockReader specifically to satisfy a read.
    * This method also sends the OP_READ_BLOCK request.
    *
-   * @param sock  An established Socket to the DN. The BlockReader will not close it normally
    * @param file  File location
    * @param block  The block object
    * @param blockToken  The block token for security

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f6ed7fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index 2361f0a..bc0db56 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -53,6 +53,9 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.htrace.Sampler;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
 
 /**
  * This is a wrapper around connection to datanode
@@ -88,6 +91,7 @@ public class RemoteBlockReader2  implements BlockReader {
   final private Peer peer;
   final private DatanodeID datanodeID;
   final private PeerCache peerCache;
+  final private long blockId;
   private final ReadableByteChannel in;
   private DataChecksum checksum;
   
@@ -143,7 +147,13 @@ public class RemoteBlockReader2  implements BlockReader {
     }
 
     if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
-      readNextPacket();
+      TraceScope scope = Trace.startSpan(
+          "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
+      try {
+        readNextPacket();
+      } finally {
+        scope.close();
+      }
     }
 
     if (LOG.isTraceEnabled()) {
@@ -165,7 +175,13 @@ public class RemoteBlockReader2  implements BlockReader {
   @Override
   public int read(ByteBuffer buf) throws IOException {
     if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
-      readNextPacket();
+      TraceScope scope = Trace.startSpan(
+          "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
+      try {
+        readNextPacket();
+      } finally {
+        scope.close();
+      }
     }
     if (curDataSlice.remaining() == 0) {
       // we're at EOF now
@@ -289,6 +305,7 @@ public class RemoteBlockReader2  implements BlockReader {
     this.startOffset = Math.max( startOffset, 0 );
     this.filename = file;
     this.peerCache = peerCache;
+    this.blockId = blockId;
 
     // The total number of bytes that we need to transfer from the DN is
     // the amount that the user wants (bytesToRead), plus the padding at
@@ -372,8 +389,6 @@ public class RemoteBlockReader2  implements BlockReader {
    * Create a new BlockReader specifically to satisfy a read.
    * This method also sends the OP_READ_BLOCK request.
    *
-   * @param sock  An established Socket to the DN. The BlockReader will not close it normally.
-   *             This socket must have an associated Channel.
    * @param file  File location
    * @param block  The block object
    * @param blockToken  The block token for security

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f6ed7fe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index febf2de..0082fcd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -47,6 +47,9 @@ import org.apache.hadoop.util.DataChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.htrace.Sampler;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
 
 /**
  * Reads a block from the disk and sends it to a recipient.
@@ -668,6 +671,17 @@ class BlockSender implements java.io.Closeable {
    */
   long sendBlock(DataOutputStream out, OutputStream baseStream, 
                  DataTransferThrottler throttler) throws IOException {
+    TraceScope scope =
+        Trace.startSpan("sendBlock_" + block.getBlockId(), Sampler.NEVER);
+    try {
+      return doSendBlock(out, baseStream, throttler);
+    } finally {
+      scope.close();
+    }
+  }
+
+  private long doSendBlock(DataOutputStream out, OutputStream baseStream,
+        DataTransferThrottler throttler) throws IOException {
     if (out == null) {
       throw new IOException( "out stream is null" );
     }