You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/10/03 22:38:22 UTC
git commit: HDFS-7055. Add tracing to DFSInputStream (cmccabe)
(cherry picked from commit 7f6ed7fe365166e8075359f1d0ad035fa876c70f)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 7ed61e150 -> b335df9a9
HDFS-7055. Add tracing to DFSInputStream (cmccabe)
(cherry picked from commit 7f6ed7fe365166e8075359f1d0ad035fa876c70f)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b335df9a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b335df9a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b335df9a
Branch: refs/heads/branch-2
Commit: b335df9a9b632f87e7bbd29b1e56baf1ce1d91d8
Parents: 7ed61e1
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Fri Oct 3 13:28:24 2014 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Fri Oct 3 13:38:15 2014 -0700
----------------------------------------------------------------------
.../hadoop/fs/CommonConfigurationKeys.java | 3 +
.../apache/hadoop/tracing/SpanReceiverHost.java | 49 +++++++++++-
.../hadoop/tracing/TraceSamplerFactory.java | 53 +++++++++++++
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 1 +
.../apache/hadoop/hdfs/BlockReaderLocal.java | 83 +++++++++++---------
.../hadoop/hdfs/BlockReaderLocalLegacy.java | 31 +++++---
.../java/org/apache/hadoop/hdfs/DFSClient.java | 30 +++++++
.../org/apache/hadoop/hdfs/DFSInputStream.java | 57 +++++++++++---
.../apache/hadoop/hdfs/RemoteBlockReader.java | 20 ++++-
.../apache/hadoop/hdfs/RemoteBlockReader2.java | 23 +++++-
.../hdfs/server/datanode/BlockSender.java | 14 ++++
11 files changed, 297 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b335df9a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 442dc7d..748c552 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -286,4 +286,7 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final String NFS_EXPORTS_ALLOWED_HOSTS_SEPARATOR = ";";
public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY = "nfs.exports.allowed.hosts";
public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY_DEFAULT = "* rw";
+
+ public static final String HADOOP_TRACE_SAMPLER = "hadoop.htrace.sampler";
+ public static final String HADOOP_TRACE_SAMPLER_DEFAULT = "NeverSampler";
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b335df9a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/SpanReceiverHost.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/SpanReceiverHost.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/SpanReceiverHost.java
index d912504..82f099e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/SpanReceiverHost.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/SpanReceiverHost.java
@@ -17,24 +17,30 @@
*/
package org.apache.hadoop.tracing;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.tracing.SpanReceiverInfo.ConfigurationPair;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.htrace.HTraceConfiguration;
import org.htrace.SpanReceiver;
import org.htrace.Trace;
-
/**
* This class provides functions for reading the names of SpanReceivers from
@@ -45,7 +51,7 @@ import org.htrace.Trace;
@InterfaceAudience.Private
public class SpanReceiverHost implements TraceAdminProtocol {
public static final String SPAN_RECEIVERS_CONF_KEY =
- "hadoop.trace.spanreceiver.classes";
+ "hadoop.htrace.spanreceiver.classes";
private static final Log LOG = LogFactory.getLog(SpanReceiverHost.class);
private final TreeMap<Long, SpanReceiver> receivers =
new TreeMap<Long, SpanReceiver>();
@@ -53,6 +59,9 @@ public class SpanReceiverHost implements TraceAdminProtocol {
private boolean closed = false;
private long highestId = 1;
+ private final static String LOCAL_FILE_SPAN_RECEIVER_PATH =
+ "hadoop.htrace.local-file-span-receiver.path";
+
private static enum SingletonHolder {
INSTANCE;
Object lock = new Object();
@@ -81,9 +90,32 @@ public class SpanReceiverHost implements TraceAdminProtocol {
private static List<ConfigurationPair> EMPTY = Collections.emptyList();
+ private static String getUniqueLocalTraceFileName() {
+ String tmp = System.getProperty("java.io.tmpdir", "/tmp");
+ String nonce = null;
+ BufferedReader reader = null;
+ try {
+ // On Linux we can get a unique local file name by reading the process id
+ // out of /proc/self/stat. (There isn't any portable way to get the
+ // process ID from Java.)
+ reader = new BufferedReader(
+ new InputStreamReader(new FileInputStream("/proc/self/stat")));
+ String line = reader.readLine();
+ nonce = line.split(" ")[0];
+ } catch (IOException e) {
+ } finally {
+ IOUtils.cleanup(LOG, reader);
+ }
+ if (nonce == null) {
+ // If we can't use the process ID, use a random nonce.
+ nonce = UUID.randomUUID().toString();
+ }
+ return new File(tmp, nonce).getAbsolutePath();
+ }
+
/**
* Reads the names of classes specified in the
- * "hadoop.trace.spanreceiver.classes" property and instantiates and registers
+ * "hadoop.htrace.spanreceiver.classes" property and instantiates and registers
* them with the Tracer as SpanReceiver's.
*
* The nullary constructor is called during construction, but if the classes
@@ -98,8 +130,17 @@ public class SpanReceiverHost implements TraceAdminProtocol {
if (receiverNames == null || receiverNames.length == 0) {
return;
}
+ // It's convenient to have each daemon log to a random trace file when
+ // testing.
+ if (config.get(LOCAL_FILE_SPAN_RECEIVER_PATH) == null) {
+ config.set(LOCAL_FILE_SPAN_RECEIVER_PATH,
+ getUniqueLocalTraceFileName());
+ }
for (String className : receiverNames) {
className = className.trim();
+ if (!className.contains(".")) {
+ className = "org.htrace.impl." + className;
+ }
try {
SpanReceiver rcvr = loadInstance(className, EMPTY);
Trace.addReceiver(rcvr);
@@ -145,7 +186,7 @@ public class SpanReceiverHost implements TraceAdminProtocol {
extraMap.put(pair.getKey(), pair.getValue());
}
return new HTraceConfiguration() {
- public static final String HTRACE_CONF_PREFIX = "hadoop.";
+ public static final String HTRACE_CONF_PREFIX = "hadoop.htrace.";
@Override
public String get(String key) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b335df9a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/TraceSamplerFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/TraceSamplerFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/TraceSamplerFactory.java
new file mode 100644
index 0000000..0de7d3e
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tracing/TraceSamplerFactory.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tracing;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.htrace.Sampler;
+import org.htrace.impl.ProbabilitySampler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class TraceSamplerFactory {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TraceSamplerFactory.class);
+
+ public static Sampler createSampler(Configuration conf) {
+ String samplerStr = conf.get(CommonConfigurationKeys.HADOOP_TRACE_SAMPLER,
+ CommonConfigurationKeys.HADOOP_TRACE_SAMPLER_DEFAULT);
+ if (samplerStr.equals("NeverSampler")) {
+ LOG.debug("HTrace is OFF for all spans.");
+ return Sampler.NEVER;
+ } else if (samplerStr.equals("AlwaysSampler")) {
+ LOG.info("HTrace is ON for all spans.");
+ return Sampler.ALWAYS;
+ } else if (samplerStr.equals("ProbabilitySampler")) {
+ double percentage =
+ conf.getDouble("htrace.probability.sampler.percentage", 0.01d);
+ LOG.info("HTrace is ON for " + percentage + "% of top-level spans.");
+ return new ProbabilitySampler(percentage / 100.0d);
+ } else {
+ throw new RuntimeException("Can't create sampler " + samplerStr +
+ ". Available samplers are NeverSampler, AlwaysSampler, " +
+ "and ProbabilitySampler.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b335df9a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 3b1421f..fa2cbbb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -7,6 +7,7 @@ Release 2.7.0 - UNRELEASED
NEW FEATURES
IMPROVEMENTS
+ HDFS-7055. Add tracing to DFSInputStream (cmccabe)
OPTIMIZATIONS
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b335df9a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
index cd75e53..3954755 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
@@ -36,6 +36,9 @@ import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import org.htrace.Sampler;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
/**
* BlockReaderLocal enables local short circuited reads. If the DFS client is on
@@ -304,48 +307,54 @@ class BlockReaderLocal implements BlockReader {
*/
private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
throws IOException {
- int total = 0;
- long startDataPos = dataPos;
- int startBufPos = buf.position();
- while (buf.hasRemaining()) {
- int nRead = dataIn.read(buf, dataPos);
- if (nRead < 0) {
- break;
+ TraceScope scope = Trace.startSpan("BlockReaderLocal#fillBuffer(" +
+ block.getBlockId() + ")", Sampler.NEVER);
+ try {
+ int total = 0;
+ long startDataPos = dataPos;
+ int startBufPos = buf.position();
+ while (buf.hasRemaining()) {
+ int nRead = dataIn.read(buf, dataPos);
+ if (nRead < 0) {
+ break;
+ }
+ dataPos += nRead;
+ total += nRead;
}
- dataPos += nRead;
- total += nRead;
- }
- if (canSkipChecksum) {
- freeChecksumBufIfExists();
- return total;
- }
- if (total > 0) {
- try {
- buf.limit(buf.position());
- buf.position(startBufPos);
- createChecksumBufIfNeeded();
- int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
- checksumBuf.clear();
- checksumBuf.limit(checksumsNeeded * checksumSize);
- long checksumPos =
- 7 + ((startDataPos / bytesPerChecksum) * checksumSize);
- while (checksumBuf.hasRemaining()) {
- int nRead = checksumIn.read(checksumBuf, checksumPos);
- if (nRead < 0) {
- throw new IOException("Got unexpected checksum file EOF at " +
- checksumPos + ", block file position " + startDataPos + " for " +
- "block " + block + " of file " + filename);
+ if (canSkipChecksum) {
+ freeChecksumBufIfExists();
+ return total;
+ }
+ if (total > 0) {
+ try {
+ buf.limit(buf.position());
+ buf.position(startBufPos);
+ createChecksumBufIfNeeded();
+ int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
+ checksumBuf.clear();
+ checksumBuf.limit(checksumsNeeded * checksumSize);
+ long checksumPos =
+ 7 + ((startDataPos / bytesPerChecksum) * checksumSize);
+ while (checksumBuf.hasRemaining()) {
+ int nRead = checksumIn.read(checksumBuf, checksumPos);
+ if (nRead < 0) {
+ throw new IOException("Got unexpected checksum file EOF at " +
+ checksumPos + ", block file position " + startDataPos + " for " +
+ "block " + block + " of file " + filename);
+ }
+ checksumPos += nRead;
}
- checksumPos += nRead;
+ checksumBuf.flip();
+
+ checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
+ } finally {
+ buf.position(buf.limit());
}
- checksumBuf.flip();
-
- checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
- } finally {
- buf.position(buf.limit());
}
+ return total;
+ } finally {
+ scope.close();
}
- return total;
}
private boolean createNoChecksumContext() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b335df9a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
index 4745575..d42b860 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
@@ -46,6 +46,9 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
+import org.htrace.Sampler;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
/**
* BlockReaderLocalLegacy enables local short circuited reads. If the DFS client is on
@@ -169,6 +172,7 @@ class BlockReaderLocalLegacy implements BlockReader {
/** offset in block where reader wants to actually read */
private long startOffset;
private final String filename;
+ private long blockId;
/**
* The only way this object can be instantiated.
@@ -320,6 +324,7 @@ class BlockReaderLocalLegacy implements BlockReader {
this.checksum = checksum;
this.verifyChecksum = verifyChecksum;
this.startOffset = Math.max(startOffset, 0);
+ this.blockId = block.getBlockId();
bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize();
@@ -357,20 +362,26 @@ class BlockReaderLocalLegacy implements BlockReader {
*/
private int fillBuffer(FileInputStream stream, ByteBuffer buf)
throws IOException {
- int bytesRead = stream.getChannel().read(buf);
- if (bytesRead < 0) {
- //EOF
- return bytesRead;
- }
- while (buf.remaining() > 0) {
- int n = stream.getChannel().read(buf);
- if (n < 0) {
+ TraceScope scope = Trace.startSpan("BlockReaderLocalLegacy#fillBuffer(" +
+ blockId + ")", Sampler.NEVER);
+ try {
+ int bytesRead = stream.getChannel().read(buf);
+ if (bytesRead < 0) {
//EOF
return bytesRead;
}
- bytesRead += n;
+ while (buf.remaining() > 0) {
+ int n = stream.getChannel().read(buf);
+ if (n < 0) {
+ //EOF
+ return bytesRead;
+ }
+ bytesRead += n;
+ }
+ return bytesRead;
+ } finally {
+ scope.close();
}
- return bytesRead;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b335df9a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index c975ad5..86faf18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -72,12 +72,14 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.lang.reflect.Proxy;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
+import java.nio.charset.Charset;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
@@ -200,6 +202,7 @@ import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
@@ -207,6 +210,8 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
+import org.apache.hadoop.tracing.SpanReceiverHost;
+import org.apache.hadoop.tracing.TraceSamplerFactory;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type;
@@ -218,6 +223,11 @@ import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.net.InetAddresses;
+import org.htrace.Sampler;
+import org.htrace.Span;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
+import org.htrace.impl.ProbabilitySampler;
/********************************************************
* DFSClient can connect to a Hadoop Filesystem and
@@ -266,6 +276,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;
@VisibleForTesting
KeyProvider provider;
+ private final SpanReceiverHost spanReceiverHost;
+ private final Sampler traceSampler;
+
/**
* DFSClient configuration
*/
@@ -582,6 +595,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
+ spanReceiverHost = SpanReceiverHost.getInstance(conf);
+ traceSampler = TraceSamplerFactory.createSampler(conf);
// Copy only the required DFSClient configuration
this.dfsClientConf = new Conf(conf);
if (this.dfsClientConf.useLegacyBlockReaderLocal) {
@@ -3158,4 +3173,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public SaslDataTransferClient getSaslDataTransferClient() {
return saslClient;
}
+
+ private static final byte[] PATH =
+ new String("path").getBytes(Charset.forName("UTF-8"));
+
+ TraceScope getPathTraceScope(String description, String path) {
+ TraceScope scope = Trace.startSpan(description, traceSampler);
+ Span span = scope.getSpan();
+ if (span != null) {
+ if (path != null) {
+ span.addKVAnnotation(PATH,
+ path.getBytes(Charset.forName("UTF-8")));
+ }
+ }
+ return scope;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b335df9a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index af1ba14..e8bcfcc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -74,6 +74,9 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.IdentityHashStore;
import com.google.common.annotations.VisibleForTesting;
+import org.htrace.Span;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
/****************************************************************
* DFSInputStream provides bytes from a named file. It handles
@@ -840,15 +843,25 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
@Override
public synchronized int read(final byte buf[], int off, int len) throws IOException {
ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);
-
- return readWithStrategy(byteArrayReader, off, len);
+ TraceScope scope =
+ dfsClient.getPathTraceScope("DFSInputStream#byteArrayRead", src);
+ try {
+ return readWithStrategy(byteArrayReader, off, len);
+ } finally {
+ scope.close();
+ }
}
@Override
public synchronized int read(final ByteBuffer buf) throws IOException {
ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf);
-
- return readWithStrategy(byteBufferReader, 0, buf.remaining());
+ TraceScope scope =
+ dfsClient.getPathTraceScope("DFSInputStream#byteBufferRead", src);
+ try {
+ return readWithStrategy(byteBufferReader, 0, buf.remaining());
+ } finally {
+ scope.close();
+ }
}
@@ -984,15 +997,23 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
final LocatedBlock block, final long start, final long end,
final ByteBuffer bb,
- final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+ final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
+ final int hedgedReadId) {
+ final Span parentSpan = Trace.currentSpan();
return new Callable<ByteBuffer>() {
@Override
public ByteBuffer call() throws Exception {
byte[] buf = bb.array();
int offset = bb.position();
- actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
- corruptedBlockMap);
- return bb;
+ TraceScope scope =
+ Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
+ try {
+ actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
+ corruptedBlockMap);
+ return bb;
+ } finally {
+ scope.close();
+ }
}
};
}
@@ -1108,6 +1129,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>();
ByteBuffer bb = null;
int len = (int) (end - start + 1);
+ int hedgedReadId = 0;
block = getBlockAt(block.getStartOffset(), false);
while (true) {
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
@@ -1120,7 +1142,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
chosenNode = chooseDataNode(block, ignored);
bb = ByteBuffer.wrap(buf, offset, len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
- chosenNode, block, start, end, bb, corruptedBlockMap);
+ chosenNode, block, start, end, bb, corruptedBlockMap,
+ hedgedReadId++);
Future<ByteBuffer> firstRequest = hedgedService
.submit(getFromDataNodeCallable);
futures.add(firstRequest);
@@ -1157,7 +1180,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
}
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
- chosenNode, block, start, end, bb, corruptedBlockMap);
+ chosenNode, block, start, end, bb, corruptedBlockMap,
+ hedgedReadId++);
Future<ByteBuffer> oneMoreRequest = hedgedService
.submit(getFromDataNodeCallable);
futures.add(oneMoreRequest);
@@ -1272,7 +1296,18 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
*/
@Override
public int read(long position, byte[] buffer, int offset, int length)
- throws IOException {
+ throws IOException {
+ TraceScope scope =
+ dfsClient.getPathTraceScope("DFSInputStream#byteArrayPread", src);
+ try {
+ return pread(position, buffer, offset, length);
+ } finally {
+ scope.close();
+ }
+ }
+
+ private int pread(long position, byte[] buffer, int offset, int length)
+ throws IOException {
// sanity checks
dfsClient.checkOpen();
if (closed) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b335df9a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
index 25a7287..f2d3395 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
@@ -46,6 +46,9 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
+import org.htrace.Sampler;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
/**
@@ -69,6 +72,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
/** offset in block where reader wants to actually read */
private long startOffset;
+ private final long blockId;
+
/** offset in block of of first chunk - may be less than startOffset
if startOffset is not chunk-aligned */
private final long firstChunkOffset;
@@ -208,6 +213,19 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
protected synchronized int readChunk(long pos, byte[] buf, int offset,
int len, byte[] checksumBuf)
throws IOException {
+ TraceScope scope =
+ Trace.startSpan("RemoteBlockReader#readChunk(" + blockId + ")",
+ Sampler.NEVER);
+ try {
+ return readChunkImpl(pos, buf, offset, len, checksumBuf);
+ } finally {
+ scope.close();
+ }
+ }
+
+ private synchronized int readChunkImpl(long pos, byte[] buf, int offset,
+ int len, byte[] checksumBuf)
+ throws IOException {
// Read one chunk.
if (eos) {
// Already hit EOF
@@ -347,6 +365,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
this.in = in;
this.checksum = checksum;
this.startOffset = Math.max( startOffset, 0 );
+ this.blockId = blockId;
// The total number of bytes that we need to transfer from the DN is
// the amount that the user wants (bytesToRead), plus the padding at
@@ -367,7 +386,6 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
* Create a new BlockReader specifically to satisfy a read.
* This method also sends the OP_READ_BLOCK request.
*
- * @param sock An established Socket to the DN. The BlockReader will not close it normally
* @param file File location
* @param block The block object
* @param blockToken The block token for security
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b335df9a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
index 2361f0a..bc0db56 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
@@ -53,6 +53,9 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;
+import org.htrace.Sampler;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
/**
* This is a wrapper around connection to datanode
@@ -88,6 +91,7 @@ public class RemoteBlockReader2 implements BlockReader {
final private Peer peer;
final private DatanodeID datanodeID;
final private PeerCache peerCache;
+ final private long blockId;
private final ReadableByteChannel in;
private DataChecksum checksum;
@@ -143,7 +147,13 @@ public class RemoteBlockReader2 implements BlockReader {
}
if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
- readNextPacket();
+ TraceScope scope = Trace.startSpan(
+ "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
+ try {
+ readNextPacket();
+ } finally {
+ scope.close();
+ }
}
if (LOG.isTraceEnabled()) {
@@ -165,7 +175,13 @@ public class RemoteBlockReader2 implements BlockReader {
@Override
public int read(ByteBuffer buf) throws IOException {
if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
- readNextPacket();
+ TraceScope scope = Trace.startSpan(
+ "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
+ try {
+ readNextPacket();
+ } finally {
+ scope.close();
+ }
}
if (curDataSlice.remaining() == 0) {
// we're at EOF now
@@ -289,6 +305,7 @@ public class RemoteBlockReader2 implements BlockReader {
this.startOffset = Math.max( startOffset, 0 );
this.filename = file;
this.peerCache = peerCache;
+ this.blockId = blockId;
// The total number of bytes that we need to transfer from the DN is
// the amount that the user wants (bytesToRead), plus the padding at
@@ -372,8 +389,6 @@ public class RemoteBlockReader2 implements BlockReader {
* Create a new BlockReader specifically to satisfy a read.
* This method also sends the OP_READ_BLOCK request.
*
- * @param sock An established Socket to the DN. The BlockReader will not close it normally.
- * This socket must have an associated Channel.
* @param file File location
* @param block The block object
* @param blockToken The block token for security
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b335df9a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index febf2de..0082fcd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -47,6 +47,9 @@ import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import org.htrace.Sampler;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
/**
* Reads a block from the disk and sends it to a recipient.
@@ -668,6 +671,17 @@ class BlockSender implements java.io.Closeable {
*/
long sendBlock(DataOutputStream out, OutputStream baseStream,
DataTransferThrottler throttler) throws IOException {
+ TraceScope scope =
+ Trace.startSpan("sendBlock_" + block.getBlockId(), Sampler.NEVER);
+ try {
+ return doSendBlock(out, baseStream, throttler);
+ } finally {
+ scope.close();
+ }
+ }
+
+ private long doSendBlock(DataOutputStream out, OutputStream baseStream,
+ DataTransferThrottler throttler) throws IOException {
if (out == null) {
throw new IOException( "out stream is null" );
}