You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/05/04 22:04:26 UTC

[07/50] [abbrv] hadoop git commit: HDFS-8280. Code Cleanup in DFSInputStream. Contributed by Jing Zhao.

HDFS-8280. Code Cleanup in DFSInputStream. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a742c10e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a742c10e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a742c10e

Branch: refs/heads/YARN-2928
Commit: a742c10e77a1a4ac8a272d722de491ad9c5e7738
Parents: be804ef
Author: Haohui Mai <wh...@apache.org>
Authored: Tue Apr 28 18:11:59 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon May 4 12:58:53 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 141 ++++++++-----------
 2 files changed, 61 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a742c10e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 1c4cfb4..e7fa8fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -478,6 +478,8 @@ Release 2.8.0 - UNRELEASED
     HDFS-8176. Record from/to snapshots in audit log for snapshot diff report.
     (J. Andreina via jing9)
 
+    HDFS-8280. Code Cleanup in DFSInputStream. (Jing Zhao via wheat9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a742c10e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 3f90397..3290223 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -601,7 +601,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
             targetBlock.getBlockSize() - 1;
       this.currentLocatedBlock = targetBlock;
 
-      assert (target==pos) : "Wrong postion " + pos + " expect " + target;
       long offsetIntoBlock = target - targetBlock.getStartOffset();
 
       DNAddrPair retval = chooseDataNode(targetBlock, null);
@@ -610,35 +609,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       StorageType storageType = retval.storageType;
 
       try {
-        ExtendedBlock blk = targetBlock.getBlock();
-        Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
-        CachingStrategy curCachingStrategy;
-        boolean shortCircuitForbidden;
-        synchronized(infoLock) {
-          curCachingStrategy = cachingStrategy;
-          shortCircuitForbidden = shortCircuitForbidden();
-        }
-        blockReader = new BlockReaderFactory(dfsClient.getConf()).
-            setInetSocketAddress(targetAddr).
-            setRemotePeerFactory(dfsClient).
-            setDatanodeInfo(chosenNode).
-            setStorageType(storageType).
-            setFileName(src).
-            setBlock(blk).
-            setBlockToken(accessToken).
-            setStartOffset(offsetIntoBlock).
-            setVerifyChecksum(verifyChecksum).
-            setClientName(dfsClient.clientName).
-            setLength(blk.getNumBytes() - offsetIntoBlock).
-            setCachingStrategy(curCachingStrategy).
-            setAllowShortCircuitLocalReads(!shortCircuitForbidden).
-            setClientCacheContext(dfsClient.getClientContext()).
-            setUserGroupInformation(dfsClient.ugi).
-            setConfiguration(dfsClient.getConfiguration()).
-            build();
+        blockReader = getBlockReader(targetBlock, offsetIntoBlock,
+            targetBlock.getBlockSize() - offsetIntoBlock, targetAddr,
+            storageType, chosenNode);
         if(connectFailedOnce) {
           DFSClient.LOG.info("Successfully connected to " + targetAddr +
-                             " for " + blk);
+                             " for " + targetBlock.getBlock());
         }
         return chosenNode;
       } catch (IOException ex) {
@@ -663,6 +639,37 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
   }
 
+  protected BlockReader getBlockReader(LocatedBlock targetBlock,
+      long offsetInBlock, long length, InetSocketAddress targetAddr,
+      StorageType storageType, DatanodeInfo datanode) throws IOException {
+    ExtendedBlock blk = targetBlock.getBlock();
+    Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
+    CachingStrategy curCachingStrategy;
+    boolean shortCircuitForbidden;
+    synchronized (infoLock) {
+      curCachingStrategy = cachingStrategy;
+      shortCircuitForbidden = shortCircuitForbidden();
+    }
+    return new BlockReaderFactory(dfsClient.getConf()).
+        setInetSocketAddress(targetAddr).
+        setRemotePeerFactory(dfsClient).
+        setDatanodeInfo(datanode).
+        setStorageType(storageType).
+        setFileName(src).
+        setBlock(blk).
+        setBlockToken(accessToken).
+        setStartOffset(offsetInBlock).
+        setVerifyChecksum(verifyChecksum).
+        setClientName(dfsClient.clientName).
+        setLength(length).
+        setCachingStrategy(curCachingStrategy).
+        setAllowShortCircuitLocalReads(!shortCircuitForbidden).
+        setClientCacheContext(dfsClient.getClientContext()).
+        setUserGroupInformation(dfsClient.ugi).
+        setConfiguration(dfsClient.getConfiguration()).
+        build();
+  }
+
   /**
    * Close it down!
    */
@@ -935,9 +942,10 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   private DNAddrPair chooseDataNode(LocatedBlock block,
       Collection<DatanodeInfo> ignoredNodes) throws IOException {
     while (true) {
-      try {
-        return getBestNodeDNAddrPair(block, ignoredNodes);
-      } catch (IOException ie) {
+      DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes);
+      if (result != null) {
+        return result;
+      } else {
         String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
           deadNodes, ignoredNodes);
         String blockInfo = block.getBlock() + " file=" + src;
@@ -954,7 +962,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           DFSClient.LOG.info("No node available for " + blockInfo);
         }
         DFSClient.LOG.info("Could not obtain " + block.getBlock()
-            + " from any node: " + ie + errMsg
+            + " from any node: " + errMsg
             + ". Will get new block locations from namenode and retry...");
         try {
           // Introducing a random factor to the wait time before another retry.
@@ -977,7 +985,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         openInfo();
         block = getBlockAt(block.getStartOffset());
         failures++;
-        continue;
       }
     }
   }
@@ -986,11 +993,10 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * Get the best node from which to stream the data.
    * @param block LocatedBlock, containing nodes in priority order.
    * @param ignoredNodes Do not choose nodes in this array (may be null)
-   * @return The DNAddrPair of the best node.
-   * @throws IOException
+   * @return The DNAddrPair of the best node. Null if no node can be chosen.
    */
   private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
-      Collection<DatanodeInfo> ignoredNodes) throws IOException {
+      Collection<DatanodeInfo> ignoredNodes) {
     DatanodeInfo[] nodes = block.getLocations();
     StorageType[] storageTypes = block.getStorageTypes();
     DatanodeInfo chosenNode = null;
@@ -1010,9 +1016,10 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       }
     }
     if (chosenNode == null) {
-      throw new IOException("No live nodes contain block " + block.getBlock() +
+      DFSClient.LOG.warn("No live nodes contain block " + block.getBlock() +
           " after checking nodes = " + Arrays.toString(nodes) +
           ", ignoredNodes = " + ignoredNodes);
+      return null;
     }
     final String dnAddr =
         chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
@@ -1102,40 +1109,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       // cached block locations may have been updated by chooseDataNode()
       // or fetchBlockAt(). Always get the latest list of locations at the
       // start of the loop.
-      CachingStrategy curCachingStrategy;
-      boolean allowShortCircuitLocalReads;
       LocatedBlock block = getBlockAt(blockStartOffset);
-      synchronized(infoLock) {
-        curCachingStrategy = cachingStrategy;
-        allowShortCircuitLocalReads = !shortCircuitForbidden();
-      }
-      DatanodeInfo chosenNode = datanode.info;
-      InetSocketAddress targetAddr = datanode.addr;
-      StorageType storageType = datanode.storageType;
       BlockReader reader = null;
-
       try {
         DFSClientFaultInjector.get().fetchFromDatanodeException();
-        Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
         int len = (int) (end - start + 1);
-        reader = new BlockReaderFactory(dfsClient.getConf()).
-            setInetSocketAddress(targetAddr).
-            setRemotePeerFactory(dfsClient).
-            setDatanodeInfo(chosenNode).
-            setStorageType(storageType).
-            setFileName(src).
-            setBlock(block.getBlock()).
-            setBlockToken(blockToken).
-            setStartOffset(start).
-            setVerifyChecksum(verifyChecksum).
-            setClientName(dfsClient.clientName).
-            setLength(len).
-            setCachingStrategy(curCachingStrategy).
-            setAllowShortCircuitLocalReads(allowShortCircuitLocalReads).
-            setClientCacheContext(dfsClient.getClientContext()).
-            setUserGroupInformation(dfsClient.ugi).
-            setConfiguration(dfsClient.getConfiguration()).
-            build();
+        reader = getBlockReader(block, start, len, datanode.addr,
+            datanode.storageType, datanode.info);
         int nread = reader.readAll(buf, offset, len);
         updateReadStatistics(readStatistics, nread, reader);
 
@@ -1148,34 +1128,33 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       } catch (ChecksumException e) {
         String msg = "fetchBlockByteRange(). Got a checksum exception for "
             + src + " at " + block.getBlock() + ":" + e.getPos() + " from "
-            + chosenNode;
+            + datanode.info;
         DFSClient.LOG.warn(msg);
         // we want to remember what we have tried
-        addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap);
-        addToDeadNodes(chosenNode);
+        addIntoCorruptedBlockMap(block.getBlock(), datanode.info,
+            corruptedBlockMap);
+        addToDeadNodes(datanode.info);
         throw new IOException(msg);
       } catch (IOException e) {
         if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
           DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
-              + "encryption key was invalid when connecting to " + targetAddr
+              + "encryption key was invalid when connecting to " + datanode.addr
               + " : " + e);
           // The encryption key used is invalid.
           refetchEncryptionKey--;
           dfsClient.clearDataEncryptionKey();
-          continue;
-        } else if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
+        } else if (refetchToken > 0 && tokenRefetchNeeded(e, datanode.addr)) {
           refetchToken--;
           try {
             fetchBlockAt(block.getStartOffset());
           } catch (IOException fbae) {
             // ignore IOE, since we can retry it later in a loop
           }
-          continue;
         } else {
-          String msg = "Failed to connect to " + targetAddr + " for file "
+          String msg = "Failed to connect to " + datanode.addr + " for file "
               + src + " for block " + block.getBlock() + ":" + e;
           DFSClient.LOG.warn("Connection failure: " + msg, e);
-          addToDeadNodes(chosenNode);
+          addToDeadNodes(datanode.info);
           throw new IOException(msg);
         }
       } finally {
@@ -1187,10 +1166,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   /**
-   * Like {@link #fetchBlockByteRange(LocatedBlock, long, long, byte[],
-   * int, Map)} except we start up a second, parallel, 'hedged' read
-   * if the first read is taking longer than configured amount of
-   * time.  We then wait on which ever read returns first.
+   * Like {@link #fetchBlockByteRange} except we start up a second, parallel,
+   * 'hedged' read if the first read is taking longer than configured amount of
+   * time. We then wait on which ever read returns first.
    */
   private void hedgedFetchBlockByteRange(long blockStartOffset, long start,
       long end, byte[] buf, int offset,
@@ -1248,9 +1226,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
         // If no nodes to do hedged reads against, pass.
         try {
-          try {
-            chosenNode = getBestNodeDNAddrPair(block, ignored);
-          } catch (IOException ioe) {
+          chosenNode = getBestNodeDNAddrPair(block, ignored);
+          if (chosenNode == null) {
             chosenNode = chooseDataNode(block, ignored);
           }
           bb = ByteBuffer.allocate(len);