You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yl...@apache.org on 2015/01/16 02:15:50 UTC
hadoop git commit: HDFS-7189. Add trace spans for DFSClient metadata
operations. (Colin P. McCabe via yliu)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 2e4df8710 -> bdbf13ac4
HDFS-7189. Add trace spans for DFSClient metadata operations. (Colin P. McCabe via yliu)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bdbf13ac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bdbf13ac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bdbf13ac
Branch: refs/heads/branch-2
Commit: bdbf13ac46b72ae07ba86b8a3672cc8a9d064afc
Parents: 2e4df87
Author: yliu <yl...@apache.org>
Authored: Fri Jan 16 00:21:38 2015 +0800
Committer: yliu <yl...@apache.org>
Committed: Fri Jan 16 00:21:38 2015 +0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hadoop/hdfs/BlockStorageLocationUtil.java | 19 +-
.../java/org/apache/hadoop/hdfs/DFSClient.java | 404 ++++++++++++++++---
.../hadoop/hdfs/DFSInotifyEventInputStream.java | 144 ++++---
.../hdfs/protocol/CacheDirectiveIterator.java | 10 +-
.../hadoop/hdfs/protocol/CachePoolIterator.java | 14 +-
.../hdfs/protocol/EncryptionZoneIterator.java | 15 +-
.../server/namenode/TestCacheDirectives.java | 3 +-
8 files changed, 475 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdbf13ac/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 5a2a2cd..803a454 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -247,6 +247,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7457. DatanodeID generates excessive garbage. (daryn via kihwal)
+ HDFS-7189. Add trace spans for DFSClient metadata operations. (Colin P.
+ McCabe via yliu)
+
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdbf13ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
index ba74978..7f992c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
@@ -51,6 +51,10 @@ import org.apache.hadoop.security.token.Token;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.htrace.Sampler;
+import org.htrace.Span;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -71,7 +75,7 @@ class BlockStorageLocationUtil {
*/
private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables(
Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
- int timeout, boolean connectToDnViaHostname) {
+ int timeout, boolean connectToDnViaHostname, Span parent) {
if (datanodeBlocks.isEmpty()) {
return Lists.newArrayList();
@@ -111,7 +115,7 @@ class BlockStorageLocationUtil {
}
VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable(
conf, datanode, poolId, blockIds, dnTokens, timeout,
- connectToDnViaHostname);
+ connectToDnViaHostname, parent);
callables.add(callable);
}
return callables;
@@ -131,11 +135,11 @@ class BlockStorageLocationUtil {
static Map<DatanodeInfo, HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata(
Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
int poolsize, int timeoutMs, boolean connectToDnViaHostname)
- throws InvalidBlockTokenException {
+ throws InvalidBlockTokenException {
List<VolumeBlockLocationCallable> callables =
createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs,
- connectToDnViaHostname);
+ connectToDnViaHostname, Trace.currentSpan());
// Use a thread pool to execute the Callables in parallel
List<Future<HdfsBlocksMetadata>> futures =
@@ -319,11 +323,12 @@ class BlockStorageLocationUtil {
private final long[] blockIds;
private final List<Token<BlockTokenIdentifier>> dnTokens;
private final boolean connectToDnViaHostname;
+ private final Span parentSpan;
VolumeBlockLocationCallable(Configuration configuration,
DatanodeInfo datanode, String poolId, long []blockIds,
List<Token<BlockTokenIdentifier>> dnTokens, int timeout,
- boolean connectToDnViaHostname) {
+ boolean connectToDnViaHostname, Span parentSpan) {
this.configuration = configuration;
this.timeout = timeout;
this.datanode = datanode;
@@ -331,6 +336,7 @@ class BlockStorageLocationUtil {
this.blockIds = blockIds;
this.dnTokens = dnTokens;
this.connectToDnViaHostname = connectToDnViaHostname;
+ this.parentSpan = parentSpan;
}
public DatanodeInfo getDatanodeInfo() {
@@ -342,6 +348,8 @@ class BlockStorageLocationUtil {
HdfsBlocksMetadata metadata = null;
// Create the RPC proxy and make the RPC
ClientDatanodeProtocol cdp = null;
+ TraceScope scope =
+ Trace.startSpan("getHdfsBlocksMetadata", parentSpan);
try {
cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
timeout, connectToDnViaHostname);
@@ -350,6 +358,7 @@ class BlockStorageLocationUtil {
// Bubble this up to the caller, handle with the Future
throw e;
} finally {
+ scope.close();
if (cdp != null) {
RPC.stopProxy(cdp);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdbf13ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index bd9622d..7369a48 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -1005,11 +1005,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @see ClientProtocol#getPreferredBlockSize(String)
*/
public long getBlockSize(String f) throws IOException {
+ TraceScope scope = getPathTraceScope("getBlockSize", f);
try {
return namenode.getPreferredBlockSize(f);
} catch (IOException ie) {
LOG.warn("Problem getting block size", ie);
throw ie;
+ } finally {
+ scope.close();
}
}
@@ -1042,17 +1045,20 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {
assert dtService != null;
- Token<DelegationTokenIdentifier> token =
- namenode.getDelegationToken(renewer);
-
- if (token != null) {
- token.setService(this.dtService);
- LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token));
- } else {
- LOG.info("Cannot get delegation token from " + renewer);
+ TraceScope scope = Trace.startSpan("getDelegationToken", traceSampler);
+ try {
+ Token<DelegationTokenIdentifier> token =
+ namenode.getDelegationToken(renewer);
+ if (token != null) {
+ token.setService(this.dtService);
+ LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token));
+ } else {
+ LOG.info("Cannot get delegation token from " + renewer);
+ }
+ return token;
+ } finally {
+ scope.close();
}
- return token;
-
}
/**
@@ -1223,7 +1229,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
@VisibleForTesting
public LocatedBlocks getLocatedBlocks(String src, long start, long length)
throws IOException {
- return callGetBlockLocations(namenode, src, start, length);
+ TraceScope scope = getPathTraceScope("getBlockLocations", src);
+ try {
+ return callGetBlockLocations(namenode, src, start, length);
+ } finally {
+ scope.close();
+ }
}
/**
@@ -1250,12 +1261,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
boolean recoverLease(String src) throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("recoverLease", src);
try {
return namenode.recoverLease(src, clientName);
} catch (RemoteException re) {
throw re.unwrapRemoteException(FileNotFoundException.class,
AccessControlException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
@@ -1272,14 +1286,19 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* as the data-block the task processes.
*/
public BlockLocation[] getBlockLocations(String src, long start,
- long length) throws IOException, UnresolvedLinkException {
- LocatedBlocks blocks = getLocatedBlocks(src, start, length);
- BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks);
- HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length];
- for (int i = 0; i < locations.length; i++) {
- hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i));
+ long length) throws IOException, UnresolvedLinkException {
+ TraceScope scope = getPathTraceScope("getBlockLocations", src);
+ try {
+ LocatedBlocks blocks = getLocatedBlocks(src, start, length);
+ BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks);
+ HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length];
+ for (int i = 0; i < locations.length; i++) {
+ hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i));
+ }
+ return hdfsLocations;
+ } finally {
+ scope.close();
}
- return hdfsLocations;
}
/**
@@ -1333,15 +1352,21 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
// Make RPCs to the datanodes to get volume locations for its replicas
- Map<DatanodeInfo, HdfsBlocksMetadata> metadatas = BlockStorageLocationUtil
- .queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
- getConf().getFileBlockStorageLocationsNumThreads,
- getConf().getFileBlockStorageLocationsTimeoutMs,
- getConf().connectToDnViaHostname);
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("metadata returned: "
- + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas));
+ TraceScope scope =
+ Trace.startSpan("getBlockStorageLocations", traceSampler);
+ Map<DatanodeInfo, HdfsBlocksMetadata> metadatas;
+ try {
+ metadatas = BlockStorageLocationUtil.
+ queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks,
+ getConf().getFileBlockStorageLocationsNumThreads,
+ getConf().getFileBlockStorageLocationsTimeoutMs,
+ getConf().connectToDnViaHostname);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("metadata returned: "
+ + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas));
+ }
+ } finally {
+ scope.close();
}
// Regroup the returned VolumeId metadata to again be grouped by
@@ -1361,19 +1386,24 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo
feInfo) throws IOException {
- if (provider == null) {
- throw new IOException("No KeyProvider is configured, cannot access" +
- " an encrypted file");
- }
- EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption(
- feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(),
- feInfo.getEncryptedDataEncryptionKey());
+ TraceScope scope = Trace.startSpan("decryptEDEK", traceSampler);
try {
- KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
- .createKeyProviderCryptoExtension(provider);
- return cryptoProvider.decryptEncryptedKey(ekv);
- } catch (GeneralSecurityException e) {
- throw new IOException(e);
+ if (provider == null) {
+ throw new IOException("No KeyProvider is configured, cannot access" +
+ " an encrypted file");
+ }
+ EncryptedKeyVersion ekv = EncryptedKeyVersion.createForDecryption(
+ feInfo.getKeyName(), feInfo.getEzKeyVersionName(), feInfo.getIV(),
+ feInfo.getEncryptedDataEncryptionKey());
+ try {
+ KeyProviderCryptoExtension cryptoProvider = KeyProviderCryptoExtension
+ .createKeyProviderCryptoExtension(provider);
+ return cryptoProvider.decryptEncryptedKey(ekv);
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e);
+ }
+ } finally {
+ scope.close();
}
}
@@ -1511,7 +1541,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
throws IOException, UnresolvedLinkException {
checkOpen();
// Get block info from namenode
- return new DFSInputStream(this, src, verifyChecksum);
+ TraceScope scope = getPathTraceScope("newDFSInputStream", src);
+ try {
+ return new DFSInputStream(this, src, verifyChecksum);
+ } finally {
+ scope.close();
+ }
}
/**
@@ -1744,6 +1779,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
public void createSymlink(String target, String link, boolean createParent)
throws IOException {
+ TraceScope scope = getPathTraceScope("createSymlink", target);
try {
FsPermission dirPerm =
FsPermission.getDefault().applyUMask(dfsClientConf.uMask);
@@ -1757,6 +1793,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
DSQuotaExceededException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
}
}
@@ -1767,11 +1805,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
public String getLinkTarget(String path) throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("getLinkTarget", path);
try {
return namenode.getLinkTarget(path);
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class);
+ } finally {
+ scope.close();
}
}
@@ -1831,6 +1872,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
public boolean setReplication(String src, short replication)
throws IOException {
+ TraceScope scope = getPathTraceScope("setReplication", src);
try {
return namenode.setReplication(src, replication);
} catch(RemoteException re) {
@@ -1840,6 +1882,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
DSQuotaExceededException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
}
}
@@ -1850,6 +1894,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
public void setStoragePolicy(String src, String policyName)
throws IOException {
+ TraceScope scope = getPathTraceScope("setStoragePolicy", src);
try {
namenode.setStoragePolicy(src, policyName);
} catch (RemoteException e) {
@@ -1859,6 +1904,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
NSQuotaExceededException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
}
}
@@ -1866,7 +1913,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @return All the existing storage policies
*/
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
- return namenode.getStoragePolicies();
+ TraceScope scope = Trace.startSpan("getStoragePolicies", traceSampler);
+ try {
+ return namenode.getStoragePolicies();
+ } finally {
+ scope.close();
+ }
}
/**
@@ -1877,6 +1929,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
@Deprecated
public boolean rename(String src, String dst) throws IOException {
checkOpen();
+ TraceScope scope = getSrcDstTraceScope("rename", src, dst);
try {
return namenode.rename(src, dst);
} catch(RemoteException re) {
@@ -1885,6 +1938,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
DSQuotaExceededException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
}
}
@@ -1894,12 +1949,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
public void concat(String trg, String [] srcs) throws IOException {
checkOpen();
+ TraceScope scope = Trace.startSpan("concat", traceSampler);
try {
namenode.concat(trg, srcs);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
}
}
/**
@@ -1909,6 +1967,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public void rename(String src, String dst, Options.Rename... options)
throws IOException {
checkOpen();
+ TraceScope scope = getSrcDstTraceScope("rename2", src, dst);
try {
namenode.rename2(src, dst, options);
} catch(RemoteException re) {
@@ -1921,6 +1980,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
NSQuotaExceededException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
}
}
/**
@@ -1930,7 +1991,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
@Deprecated
public boolean delete(String src) throws IOException {
checkOpen();
- return namenode.delete(src, true);
+ return delete(src, true);
}
/**
@@ -1942,6 +2003,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
public boolean delete(String src, boolean recursive) throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("delete", src);
try {
return namenode.delete(src, recursive);
} catch(RemoteException re) {
@@ -1950,6 +2012,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
SafeModeException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
}
}
@@ -1979,15 +2043,17 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @see ClientProtocol#getListing(String, byte[], boolean)
*/
public DirectoryListing listPaths(String src, byte[] startAfter,
- boolean needLocation)
- throws IOException {
+ boolean needLocation) throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("listPaths", src);
try {
return namenode.getListing(src, startAfter, needLocation);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
@@ -2001,12 +2067,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
public HdfsFileStatus getFileInfo(String src) throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("getFileInfo", src);
try {
return namenode.getFileInfo(src);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
@@ -2016,12 +2085,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
public boolean isFileClosed(String src) throws IOException{
checkOpen();
+ TraceScope scope = getPathTraceScope("isFileClosed", src);
try {
return namenode.isFileClosed(src);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
@@ -2035,12 +2107,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("getFileLinkInfo", src);
try {
return namenode.getFileLinkInfo(src);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
UnresolvedPathException.class);
- }
+ } finally {
+ scope.close();
+ }
}
@InterfaceAudience.Private
@@ -2337,6 +2412,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public void setPermission(String src, FsPermission permission)
throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("setPermission", src);
try {
namenode.setPermission(src, permission);
} catch(RemoteException re) {
@@ -2345,6 +2421,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
SafeModeException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
}
}
@@ -2359,6 +2437,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public void setOwner(String src, String username, String groupname)
throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("setOwner", src);
try {
namenode.setOwner(src, username, groupname);
} catch(RemoteException re) {
@@ -2367,6 +2446,18 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
SafeModeException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
+ }
+ }
+
+ private long[] callGetStats() throws IOException {
+ checkOpen();
+ TraceScope scope = Trace.startSpan("getStats", traceSampler);
+ try {
+ return namenode.getStats();
+ } finally {
+ scope.close();
}
}
@@ -2374,7 +2465,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @see ClientProtocol#getStats()
*/
public FsStatus getDiskStatus() throws IOException {
- long rawNums[] = namenode.getStats();
+ long rawNums[] = callGetStats();
return new FsStatus(rawNums[0], rawNums[1], rawNums[2]);
}
@@ -2384,7 +2475,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @throws IOException
*/
public long getMissingBlocksCount() throws IOException {
- return namenode.getStats()[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX];
+ return callGetStats()[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX];
}
/**
@@ -2393,7 +2484,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @throws IOException
*/
public long getMissingReplOneBlocksCount() throws IOException {
- return namenode.getStats()[ClientProtocol.
+ return callGetStats()[ClientProtocol.
GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX];
}
@@ -2402,7 +2493,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @throws IOException
*/
public long getUnderReplicatedBlocksCount() throws IOException {
- return namenode.getStats()[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX];
+ return callGetStats()[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX];
}
/**
@@ -2410,7 +2501,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @throws IOException
*/
public long getCorruptBlocksCount() throws IOException {
- return namenode.getStats()[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX];
+ return callGetStats()[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX];
}
/**
@@ -2419,18 +2510,37 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
public CorruptFileBlocks listCorruptFileBlocks(String path,
String cookie)
- throws IOException {
- return namenode.listCorruptFileBlocks(path, cookie);
+ throws IOException {
+ checkOpen();
+ TraceScope scope = getPathTraceScope("listCorruptFileBlocks", path);
+ try {
+ return namenode.listCorruptFileBlocks(path, cookie);
+ } finally {
+ scope.close();
+ }
}
public DatanodeInfo[] datanodeReport(DatanodeReportType type)
- throws IOException {
- return namenode.getDatanodeReport(type);
+ throws IOException {
+ checkOpen();
+ TraceScope scope = Trace.startSpan("datanodeReport", traceSampler);
+ try {
+ return namenode.getDatanodeReport(type);
+ } finally {
+ scope.close();
+ }
}
public DatanodeStorageReport[] getDatanodeStorageReport(
DatanodeReportType type) throws IOException {
- return namenode.getDatanodeStorageReport(type);
+ checkOpen();
+ TraceScope scope =
+ Trace.startSpan("datanodeStorageReport", traceSampler);
+ try {
+ return namenode.getDatanodeStorageReport(type);
+ } finally {
+ scope.close();
+ }
}
/**
@@ -2454,7 +2564,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, boolean)
*/
public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException{
- return namenode.setSafeMode(action, isChecked);
+ TraceScope scope =
+ Trace.startSpan("setSafeMode", traceSampler);
+ try {
+ return namenode.setSafeMode(action, isChecked);
+ } finally {
+ scope.close();
+ }
}
/**
@@ -2468,10 +2584,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public String createSnapshot(String snapshotRoot, String snapshotName)
throws IOException {
checkOpen();
+ TraceScope scope = Trace.startSpan("createSnapshot", traceSampler);
try {
return namenode.createSnapshot(snapshotRoot, snapshotName);
} catch(RemoteException re) {
throw re.unwrapRemoteException();
+ } finally {
+ scope.close();
}
}
@@ -2486,10 +2605,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
public void deleteSnapshot(String snapshotRoot, String snapshotName)
throws IOException {
+ checkOpen();
+ TraceScope scope = Trace.startSpan("deleteSnapshot", traceSampler);
try {
namenode.deleteSnapshot(snapshotRoot, snapshotName);
} catch(RemoteException re) {
throw re.unwrapRemoteException();
+ } finally {
+ scope.close();
}
}
@@ -2504,10 +2627,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public void renameSnapshot(String snapshotDir, String snapshotOldName,
String snapshotNewName) throws IOException {
checkOpen();
+ TraceScope scope = Trace.startSpan("renameSnapshot", traceSampler);
try {
namenode.renameSnapshot(snapshotDir, snapshotOldName, snapshotNewName);
} catch(RemoteException re) {
throw re.unwrapRemoteException();
+ } finally {
+ scope.close();
}
}
@@ -2520,10 +2646,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
throws IOException {
checkOpen();
+ TraceScope scope = Trace.startSpan("getSnapshottableDirListing",
+ traceSampler);
try {
return namenode.getSnapshottableDirListing();
} catch(RemoteException re) {
throw re.unwrapRemoteException();
+ } finally {
+ scope.close();
}
}
@@ -2534,10 +2664,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
public void allowSnapshot(String snapshotRoot) throws IOException {
checkOpen();
+ TraceScope scope = Trace.startSpan("allowSnapshot", traceSampler);
try {
namenode.allowSnapshot(snapshotRoot);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
+ } finally {
+ scope.close();
}
}
@@ -2548,10 +2681,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
public void disallowSnapshot(String snapshotRoot) throws IOException {
checkOpen();
+ TraceScope scope = Trace.startSpan("disallowSnapshot", traceSampler);
try {
namenode.disallowSnapshot(snapshotRoot);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
+ } finally {
+ scope.close();
}
}
@@ -2563,78 +2699,99 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public SnapshotDiffReport getSnapshotDiffReport(String snapshotDir,
String fromSnapshot, String toSnapshot) throws IOException {
checkOpen();
+ TraceScope scope = Trace.startSpan("getSnapshotDiffReport", traceSampler);
try {
return namenode.getSnapshotDiffReport(snapshotDir,
fromSnapshot, toSnapshot);
} catch(RemoteException re) {
throw re.unwrapRemoteException();
+ } finally {
+ scope.close();
}
}
public long addCacheDirective(
CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
checkOpen();
+ TraceScope scope = Trace.startSpan("addCacheDirective", traceSampler);
try {
return namenode.addCacheDirective(info, flags);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
+ } finally {
+ scope.close();
}
}
public void modifyCacheDirective(
CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
checkOpen();
+ TraceScope scope = Trace.startSpan("modifyCacheDirective", traceSampler);
try {
namenode.modifyCacheDirective(info, flags);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
+ } finally {
+ scope.close();
}
}
public void removeCacheDirective(long id)
throws IOException {
checkOpen();
+ TraceScope scope = Trace.startSpan("removeCacheDirective", traceSampler);
try {
namenode.removeCacheDirective(id);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
+ } finally {
+ scope.close();
}
}
public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
CacheDirectiveInfo filter) throws IOException {
- return new CacheDirectiveIterator(namenode, filter);
+ return new CacheDirectiveIterator(namenode, filter, traceSampler);
}
public void addCachePool(CachePoolInfo info) throws IOException {
checkOpen();
+ TraceScope scope = Trace.startSpan("addCachePool", traceSampler);
try {
namenode.addCachePool(info);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
+ } finally {
+ scope.close();
}
}
public void modifyCachePool(CachePoolInfo info) throws IOException {
checkOpen();
+ TraceScope scope = Trace.startSpan("modifyCachePool", traceSampler);
try {
namenode.modifyCachePool(info);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
+ } finally {
+ scope.close();
}
}
public void removeCachePool(String poolName) throws IOException {
checkOpen();
+ TraceScope scope = Trace.startSpan("removeCachePool", traceSampler);
try {
namenode.removeCachePool(poolName);
} catch (RemoteException re) {
throw re.unwrapRemoteException();
+ } finally {
+ scope.close();
}
}
public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
- return new CachePoolIterator(namenode);
+ return new CachePoolIterator(namenode, traceSampler);
}
/**
@@ -2643,10 +2800,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @see ClientProtocol#saveNamespace()
*/
void saveNamespace() throws AccessControlException, IOException {
+ TraceScope scope = Trace.startSpan("saveNamespace", traceSampler);
try {
namenode.saveNamespace();
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class);
+ } finally {
+ scope.close();
}
}
@@ -2657,10 +2817,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @see ClientProtocol#rollEdits()
*/
long rollEdits() throws AccessControlException, IOException {
+ TraceScope scope = Trace.startSpan("rollEdits", traceSampler);
try {
return namenode.rollEdits();
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class);
+ } finally {
+ scope.close();
}
}
@@ -2676,7 +2839,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
boolean restoreFailedStorage(String arg)
throws AccessControlException, IOException{
- return namenode.restoreFailedStorage(arg);
+ TraceScope scope = Trace.startSpan("restoreFailedStorage", traceSampler);
+ try {
+ return namenode.restoreFailedStorage(arg);
+ } finally {
+ scope.close();
+ }
}
/**
@@ -2687,7 +2855,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @see ClientProtocol#refreshNodes()
*/
public void refreshNodes() throws IOException {
- namenode.refreshNodes();
+ TraceScope scope = Trace.startSpan("refreshNodes", traceSampler);
+ try {
+ namenode.refreshNodes();
+ } finally {
+ scope.close();
+ }
}
/**
@@ -2696,7 +2869,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @see ClientProtocol#metaSave(String)
*/
public void metaSave(String pathname) throws IOException {
- namenode.metaSave(pathname);
+ TraceScope scope = Trace.startSpan("metaSave", traceSampler);
+ try {
+ namenode.metaSave(pathname);
+ } finally {
+ scope.close();
+ }
}
/**
@@ -2708,18 +2886,33 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @see ClientProtocol#setBalancerBandwidth(long)
*/
public void setBalancerBandwidth(long bandwidth) throws IOException {
- namenode.setBalancerBandwidth(bandwidth);
+ TraceScope scope = Trace.startSpan("setBalancerBandwidth", traceSampler);
+ try {
+ namenode.setBalancerBandwidth(bandwidth);
+ } finally {
+ scope.close();
+ }
}
/**
* @see ClientProtocol#finalizeUpgrade()
*/
public void finalizeUpgrade() throws IOException {
- namenode.finalizeUpgrade();
+ TraceScope scope = Trace.startSpan("finalizeUpgrade", traceSampler);
+ try {
+ namenode.finalizeUpgrade();
+ } finally {
+ scope.close();
+ }
}
RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) throws IOException {
- return namenode.rollingUpgrade(action);
+ TraceScope scope = Trace.startSpan("rollingUpgrade", traceSampler);
+ try {
+ return namenode.rollingUpgrade(action);
+ } finally {
+ scope.close();
+ }
}
/**
@@ -2776,6 +2969,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
if(LOG.isDebugEnabled()) {
LOG.debug(src + ": masked=" + absPermission);
}
+ TraceScope scope = Trace.startSpan("mkdir", traceSampler);
try {
return namenode.mkdirs(src, absPermission, createParent);
} catch(RemoteException re) {
@@ -2789,6 +2983,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
DSQuotaExceededException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
}
}
@@ -2799,12 +2995,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @see ClientProtocol#getContentSummary(String)
*/
ContentSummary getContentSummary(String src) throws IOException {
+ TraceScope scope = getPathTraceScope("getContentSummary", src);
try {
return namenode.getContentSummary(src);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
@@ -2824,6 +3023,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
diskspaceQuota);
}
+ TraceScope scope = getPathTraceScope("setQuota", src);
try {
namenode.setQuota(src, namespaceQuota, diskspaceQuota);
} catch(RemoteException re) {
@@ -2833,6 +3033,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
DSQuotaExceededException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
}
}
@@ -2843,6 +3045,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*/
public void setTimes(String src, long mtime, long atime) throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("setTimes", src);
try {
namenode.setTimes(src, mtime, atime);
} catch(RemoteException re) {
@@ -2850,6 +3053,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
FileNotFoundException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
+ } finally {
+ scope.close();
}
}
@@ -2901,6 +3106,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public void modifyAclEntries(String src, List<AclEntry> aclSpec)
throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("modifyAclEntries", src);
try {
namenode.modifyAclEntries(src, aclSpec);
} catch(RemoteException re) {
@@ -2911,12 +3117,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
SafeModeException.class,
SnapshotAccessControlException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
public void removeAclEntries(String src, List<AclEntry> aclSpec)
throws IOException {
checkOpen();
+ TraceScope scope = Trace.startSpan("removeAclEntries", traceSampler);
try {
namenode.removeAclEntries(src, aclSpec);
} catch(RemoteException re) {
@@ -2927,11 +3136,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
SafeModeException.class,
SnapshotAccessControlException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
public void removeDefaultAcl(String src) throws IOException {
checkOpen();
+ TraceScope scope = Trace.startSpan("removeDefaultAcl", traceSampler);
try {
namenode.removeDefaultAcl(src);
} catch(RemoteException re) {
@@ -2942,11 +3154,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
SafeModeException.class,
SnapshotAccessControlException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
public void removeAcl(String src) throws IOException {
checkOpen();
+ TraceScope scope = Trace.startSpan("removeAcl", traceSampler);
try {
namenode.removeAcl(src);
} catch(RemoteException re) {
@@ -2957,11 +3172,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
SafeModeException.class,
SnapshotAccessControlException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
public void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
checkOpen();
+ TraceScope scope = Trace.startSpan("setAcl", traceSampler);
try {
namenode.setAcl(src, aclSpec);
} catch(RemoteException re) {
@@ -2972,11 +3190,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
SafeModeException.class,
SnapshotAccessControlException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
public AclStatus getAclStatus(String src) throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("getAclStatus", src);
try {
return namenode.getAclStatus(src);
} catch(RemoteException re) {
@@ -2984,41 +3205,50 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
AclException.class,
FileNotFoundException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
public void createEncryptionZone(String src, String keyName)
throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("createEncryptionZone", src);
try {
namenode.createEncryptionZone(src, keyName);
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
SafeModeException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
public EncryptionZone getEZForPath(String src)
throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("getEZForPath", src);
try {
return namenode.getEZForPath(src);
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
public RemoteIterator<EncryptionZone> listEncryptionZones()
throws IOException {
checkOpen();
- return new EncryptionZoneIterator(namenode);
+ return new EncryptionZoneIterator(namenode, traceSampler);
}
public void setXAttr(String src, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("setXAttr", src);
try {
namenode.setXAttr(src, XAttrHelper.buildXAttr(name, value), flag);
} catch (RemoteException re) {
@@ -3028,11 +3258,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
SafeModeException.class,
SnapshotAccessControlException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
public byte[] getXAttr(String src, String name) throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("getXAttr", src);
try {
final List<XAttr> xAttrs = XAttrHelper.buildXAttrAsList(name);
final List<XAttr> result = namenode.getXAttrs(src, xAttrs);
@@ -3041,23 +3274,29 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
public Map<String, byte[]> getXAttrs(String src) throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("getXAttrs", src);
try {
return XAttrHelper.buildXAttrMap(namenode.getXAttrs(src, null));
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
public Map<String, byte[]> getXAttrs(String src, List<String> names)
throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("getXAttrs", src);
try {
return XAttrHelper.buildXAttrMap(namenode.getXAttrs(
src, XAttrHelper.buildXAttrs(names)));
@@ -3065,12 +3304,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
public List<String> listXAttrs(String src)
throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("listXAttrs", src);
try {
final Map<String, byte[]> xattrs =
XAttrHelper.buildXAttrMap(namenode.listXAttrs(src));
@@ -3079,11 +3321,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
public void removeXAttr(String src, String name) throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("removeXAttr", src);
try {
namenode.removeXAttr(src, XAttrHelper.buildXAttr(name));
} catch(RemoteException re) {
@@ -3093,27 +3338,32 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
SafeModeException.class,
SnapshotAccessControlException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
public void checkAccess(String src, FsAction mode) throws IOException {
checkOpen();
+ TraceScope scope = getPathTraceScope("checkAccess", src);
try {
namenode.checkAccess(src, mode);
} catch (RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
UnresolvedPathException.class);
+ } finally {
+ scope.close();
}
}
public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
- return new DFSInotifyEventInputStream(namenode);
+ return new DFSInotifyEventInputStream(traceSampler, namenode);
}
public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
throws IOException {
- return new DFSInotifyEventInputStream(namenode, lastReadTxid);
+ return new DFSInotifyEventInputStream(traceSampler, namenode, lastReadTxid);
}
@Override // RemotePeerFactory
@@ -3232,4 +3482,24 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
return scope;
}
+
+ private static final byte[] SRC = "src".getBytes(Charset.forName("UTF-8"));
+
+ private static final byte[] DST = "dst".getBytes(Charset.forName("UTF-8"));
+
+ TraceScope getSrcDstTraceScope(String description, String src, String dst) {
+ TraceScope scope = Trace.startSpan(description, traceSampler);
+ Span span = scope.getSpan();
+ if (span != null) {
+ if (src != null) {
+ span.addKVAnnotation(SRC,
+ src.getBytes(Charset.forName("UTF-8")));
+ }
+ if (dst != null) {
+ span.addKVAnnotation(DST,
+ dst.getBytes(Charset.forName("UTF-8")));
+ }
+ }
+ return scope;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdbf13ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
index 83b92b9..803e4f2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
@@ -26,6 +26,9 @@ import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.util.Time;
+import org.htrace.Sampler;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +47,11 @@ public class DFSInotifyEventInputStream {
public static Logger LOG = LoggerFactory.getLogger(DFSInotifyEventInputStream
.class);
+ /**
+ * The trace sampler to use when making RPCs to the NameNode.
+ */
+ private final Sampler<?> traceSampler;
+
private final ClientProtocol namenode;
private Iterator<EventBatch> it;
private long lastReadTxid;
@@ -59,12 +67,15 @@ public class DFSInotifyEventInputStream {
private static final int INITIAL_WAIT_MS = 10;
- DFSInotifyEventInputStream(ClientProtocol namenode) throws IOException {
- this(namenode, namenode.getCurrentEditLogTxid()); // only consider new txn's
+ DFSInotifyEventInputStream(Sampler<?> traceSampler, ClientProtocol namenode)
+ throws IOException {
+ // Only consider new transaction IDs.
+ this(traceSampler, namenode, namenode.getCurrentEditLogTxid());
}
- DFSInotifyEventInputStream(ClientProtocol namenode, long lastReadTxid)
- throws IOException {
+ DFSInotifyEventInputStream(Sampler traceSampler, ClientProtocol namenode,
+ long lastReadTxid) throws IOException {
+ this.traceSampler = traceSampler;
this.namenode = namenode;
this.it = Iterators.emptyIterator();
this.lastReadTxid = lastReadTxid;
@@ -87,39 +98,45 @@ public class DFSInotifyEventInputStream {
* The next available batch of events will be returned.
*/
public EventBatch poll() throws IOException, MissingEventsException {
- // need to keep retrying until the NN sends us the latest committed txid
- if (lastReadTxid == -1) {
- LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
- lastReadTxid = namenode.getCurrentEditLogTxid();
- return null;
- }
- if (!it.hasNext()) {
- EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
- if (el.getLastTxid() != -1) {
- // we only want to set syncTxid when we were actually able to read some
- // edits on the NN -- otherwise it will seem like edits are being
- // generated faster than we can read them when the problem is really
- // that we are temporarily unable to read edits
- syncTxid = el.getSyncTxid();
- it = el.getBatches().iterator();
- long formerLastReadTxid = lastReadTxid;
- lastReadTxid = el.getLastTxid();
- if (el.getFirstTxid() != formerLastReadTxid + 1) {
- throw new MissingEventsException(formerLastReadTxid + 1,
- el.getFirstTxid());
+ TraceScope scope =
+ Trace.startSpan("inotifyPoll", traceSampler);
+ try {
+ // need to keep retrying until the NN sends us the latest committed txid
+ if (lastReadTxid == -1) {
+ LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
+ lastReadTxid = namenode.getCurrentEditLogTxid();
+ return null;
+ }
+ if (!it.hasNext()) {
+ EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
+ if (el.getLastTxid() != -1) {
+ // we only want to set syncTxid when we were actually able to read some
+ // edits on the NN -- otherwise it will seem like edits are being
+ // generated faster than we can read them when the problem is really
+ // that we are temporarily unable to read edits
+ syncTxid = el.getSyncTxid();
+ it = el.getBatches().iterator();
+ long formerLastReadTxid = lastReadTxid;
+ lastReadTxid = el.getLastTxid();
+ if (el.getFirstTxid() != formerLastReadTxid + 1) {
+ throw new MissingEventsException(formerLastReadTxid + 1,
+ el.getFirstTxid());
+ }
+ } else {
+ LOG.debug("poll(): read no edits from the NN when requesting edits " +
+ "after txid {}", lastReadTxid);
+ return null;
}
+ }
+
+ if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
+ // newly seen edit log ops actually got converted to events
+ return it.next();
} else {
- LOG.debug("poll(): read no edits from the NN when requesting edits " +
- "after txid {}", lastReadTxid);
return null;
}
- }
-
- if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
- // newly seen edit log ops actually got converted to events
- return it.next();
- } else {
- return null;
+ } finally {
+ scope.close();
}
}
@@ -163,25 +180,29 @@ public class DFSInotifyEventInputStream {
*/
public EventBatch poll(long time, TimeUnit tu) throws IOException,
InterruptedException, MissingEventsException {
- long initialTime = Time.monotonicNow();
- long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
- long nextWait = INITIAL_WAIT_MS;
+ TraceScope scope = Trace.startSpan("inotifyPollWithTimeout", traceSampler);
EventBatch next = null;
- while ((next = poll()) == null) {
- long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
- if (timeLeft <= 0) {
- LOG.debug("timed poll(): timed out");
- break;
- } else if (timeLeft < nextWait * 2) {
- nextWait = timeLeft;
- } else {
- nextWait *= 2;
+ try {
+ long initialTime = Time.monotonicNow();
+ long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
+ long nextWait = INITIAL_WAIT_MS;
+ while ((next = poll()) == null) {
+ long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
+ if (timeLeft <= 0) {
+ LOG.debug("timed poll(): timed out");
+ break;
+ } else if (timeLeft < nextWait * 2) {
+ nextWait = timeLeft;
+ } else {
+ nextWait *= 2;
+ }
+ LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
+ nextWait);
+ Thread.sleep(nextWait);
}
- LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
- nextWait);
- Thread.sleep(nextWait);
+ } finally {
+ scope.close();
}
-
return next;
}
@@ -196,18 +217,23 @@ public class DFSInotifyEventInputStream {
*/
public EventBatch take() throws IOException, InterruptedException,
MissingEventsException {
+ TraceScope scope = Trace.startSpan("inotifyTake", traceSampler);
EventBatch next = null;
- int nextWaitMin = INITIAL_WAIT_MS;
- while ((next = poll()) == null) {
- // sleep for a random period between nextWaitMin and nextWaitMin * 2
- // to avoid stampedes at the NN if there are multiple clients
- int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
- LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
- Thread.sleep(sleepTime);
- // the maximum sleep is 2 minutes
- nextWaitMin = Math.min(60000, nextWaitMin * 2);
+ try {
+ int nextWaitMin = INITIAL_WAIT_MS;
+ while ((next = poll()) == null) {
+ // sleep for a random period between nextWaitMin and nextWaitMin * 2
+ // to avoid stampedes at the NN if there are multiple clients
+ int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
+ LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
+ Thread.sleep(sleepTime);
+ // the maximum sleep is 2 minutes
+ nextWaitMin = Math.min(60000, nextWaitMin * 2);
+ }
+ } finally {
+ scope.close();
}
return next;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdbf13ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java
index 676106d..d28b771 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java
@@ -27,6 +27,9 @@ import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.ipc.RemoteException;
import com.google.common.base.Preconditions;
+import org.htrace.Sampler;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
/**
* CacheDirectiveIterator is a remote iterator that iterates cache directives.
@@ -39,12 +42,14 @@ public class CacheDirectiveIterator
private CacheDirectiveInfo filter;
private final ClientProtocol namenode;
+ private final Sampler<?> traceSampler;
public CacheDirectiveIterator(ClientProtocol namenode,
- CacheDirectiveInfo filter) {
+ CacheDirectiveInfo filter, Sampler<?> traceSampler) {
super(0L);
this.namenode = namenode;
this.filter = filter;
+ this.traceSampler = traceSampler;
}
private static CacheDirectiveInfo removeIdFromFilter(CacheDirectiveInfo filter) {
@@ -89,6 +94,7 @@ public class CacheDirectiveIterator
public BatchedEntries<CacheDirectiveEntry> makeRequest(Long prevKey)
throws IOException {
BatchedEntries<CacheDirectiveEntry> entries = null;
+ TraceScope scope = Trace.startSpan("listCacheDirectives", traceSampler);
try {
entries = namenode.listCacheDirectives(prevKey, filter);
} catch (IOException e) {
@@ -110,6 +116,8 @@ public class CacheDirectiveIterator
"Did not find requested id " + id);
}
throw e;
+ } finally {
+ scope.close();
}
Preconditions.checkNotNull(entries);
return entries;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdbf13ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java
index 44d6b45..1f17c8e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java
@@ -23,6 +23,9 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.htrace.Sampler;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
/**
* CachePoolIterator is a remote iterator that iterates cache pools.
@@ -34,16 +37,23 @@ public class CachePoolIterator
extends BatchedRemoteIterator<String, CachePoolEntry> {
private final ClientProtocol namenode;
+ private final Sampler traceSampler;
- public CachePoolIterator(ClientProtocol namenode) {
+ public CachePoolIterator(ClientProtocol namenode, Sampler traceSampler) {
super("");
this.namenode = namenode;
+ this.traceSampler = traceSampler;
}
@Override
public BatchedEntries<CachePoolEntry> makeRequest(String prevKey)
throws IOException {
- return namenode.listCachePools(prevKey);
+ TraceScope scope = Trace.startSpan("listCachePools", traceSampler);
+ try {
+ return namenode.listCachePools(prevKey);
+ } finally {
+ scope.close();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdbf13ac/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java
index b8c21b0..8a648e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java
@@ -23,6 +23,9 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.htrace.Sampler;
+import org.htrace.Trace;
+import org.htrace.TraceScope;
/**
* EncryptionZoneIterator is a remote iterator that iterates over encryption
@@ -34,16 +37,24 @@ public class EncryptionZoneIterator
extends BatchedRemoteIterator<Long, EncryptionZone> {
private final ClientProtocol namenode;
+ private final Sampler<?> traceSampler;
- public EncryptionZoneIterator(ClientProtocol namenode) {
+ public EncryptionZoneIterator(ClientProtocol namenode,
+ Sampler<?> traceSampler) {
super(Long.valueOf(0));
this.namenode = namenode;
+ this.traceSampler = traceSampler;
}
@Override
public BatchedEntries<EncryptionZone> makeRequest(Long prevId)
throws IOException {
- return namenode.listEncryptionZones(prevId);
+ TraceScope scope = Trace.startSpan("listEncryptionZones", traceSampler);
+ try {
+ return namenode.listEncryptionZones(prevId);
+ } finally {
+ scope.close();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdbf13ac/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
index 9307692..70410ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.util.GSet;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.htrace.Sampler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -908,7 +909,7 @@ public class TestCacheDirectives {
// Uncache and check each path in sequence
RemoteIterator<CacheDirectiveEntry> entries =
- new CacheDirectiveIterator(nnRpc, null);
+ new CacheDirectiveIterator(nnRpc, null, Sampler.NEVER);
for (int i=0; i<numFiles; i++) {
CacheDirectiveEntry entry = entries.next();
nnRpc.removeCacheDirective(entry.getInfo().getId());