You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/05/04 19:58:20 UTC

[46/50] hadoop git commit: HDFS-8272. Erasure Coding: simplify the retry logic in DFSStripedInputStream (stateful read). Contributed by Jing Zhao

HDFS-8272. Erasure Coding: simplify the retry logic in DFSStripedInputStream (stateful read). Contributed by Jing Zhao


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aae54522
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aae54522
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aae54522

Branch: refs/heads/HDFS-7285
Commit: aae54522e29e774e923bc1c62a78a432bd5fabb3
Parents: 0023b10
Author: Zhe Zhang <zh...@apache.org>
Authored: Wed Apr 29 15:53:31 2015 -0700
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon May 4 10:13:31 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |   3 +
 .../hadoop/hdfs/DFSStripedInputStream.java      | 336 ++++++++-----------
 2 files changed, 150 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/aae54522/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 9b4bf24..6a9bdee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -143,3 +143,6 @@
 
     HDFS-8235. Erasure Coding: Create DFSStripedInputStream in DFSClient#open.
     (Kai Sasaki via jing9)
+
+    HDFS-8272. Erasure Coding: simplify the retry logic in DFSStripedInputStream 
+    (stateful read). (Jing Zhao via Zhe Zhang)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aae54522/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index f6f7ed2..3da7306 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -22,11 +22,8 @@ import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.token.Token;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
 import org.apache.htrace.TraceScope;
@@ -126,23 +123,42 @@ public class DFSStripedInputStream extends DFSInputStream {
     return results;
   }
 
+  private static class ReaderRetryPolicy {
+    private int fetchEncryptionKeyTimes = 1;
+    private int fetchTokenTimes = 1;
+
+    void refetchEncryptionKey() {
+      fetchEncryptionKeyTimes--;
+    }
+
+    void refetchToken() {
+      fetchTokenTimes--;
+    }
+
+    boolean shouldRefetchEncryptionKey() {
+      return fetchEncryptionKeyTimes > 0;
+    }
+
+    boolean shouldRefetchToken() {
+      return fetchTokenTimes > 0;
+    }
+  }
+
   private final short groupSize = HdfsConstants.NUM_DATA_BLOCKS;
-  private BlockReader[] blockReaders = null;
-  private DatanodeInfo[] currentNodes = null;
+  private final BlockReader[] blockReaders = new BlockReader[groupSize];
+  private final DatanodeInfo[] currentNodes = new DatanodeInfo[groupSize];
   private final int cellSize;
   private final short dataBlkNum;
   private final short parityBlkNum;
-  private final ECInfo ecInfo;
 
-  DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, ECInfo info)
-      throws IOException {
+  DFSStripedInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
+      ECInfo ecInfo) throws IOException {
     super(dfsClient, src, verifyChecksum);
     // ECInfo is restored from NN just before reading striped file.
-    assert info != null;
-    ecInfo = info;
+    assert ecInfo != null;
     cellSize = ecInfo.getSchema().getChunkSize();
-    dataBlkNum = (short)ecInfo.getSchema().getNumDataUnits();
-    parityBlkNum = (short)ecInfo.getSchema().getNumParityUnits();
+    dataBlkNum = (short) ecInfo.getSchema().getNumDataUnits();
+    parityBlkNum = (short) ecInfo.getSchema().getNumParityUnits();
     DFSClient.LOG.debug("Creating an striped input stream for file " + src);
   }
 
@@ -162,9 +178,7 @@ public class DFSStripedInputStream extends DFSInputStream {
    * When seeking into a new block group, create blockReader for each internal
    * block in the group.
    */
-  @VisibleForTesting
-  private synchronized DatanodeInfo[] blockSeekTo(long target)
-      throws IOException {
+  private synchronized void blockSeekTo(long target) throws IOException {
     if (target >= getFileLength()) {
       throw new IOException("Attempted to read past end of file");
     }
@@ -172,18 +186,13 @@ public class DFSStripedInputStream extends DFSInputStream {
     // Will be getting a new BlockReader.
     closeCurrentBlockReaders();
 
-    // Connect to best DataNode for desired Block, with potential offset
-    DatanodeInfo[] chosenNodes = new DatanodeInfo[groupSize];
-    int refetchToken = 1; // only need to get a new access token once
-    int refetchEncryptionKey = 1; // only need to get a new encryption key once
-
     // Compute desired striped block group
     LocatedStripedBlock targetBlockGroup = getBlockGroupAt(target);
-
     // Update current position
     this.pos = target;
     this.blockEnd = targetBlockGroup.getStartOffset() +
         targetBlockGroup.getBlockSize() - 1;
+    currentLocatedBlock = targetBlockGroup;
 
     long offsetIntoBlockGroup = target - targetBlockGroup.getStartOffset();
     LocatedBlock[] targetBlocks = StripedBlockUtil.parseStripedBlockGroup(
@@ -191,71 +200,50 @@ public class DFSStripedInputStream extends DFSInputStream {
     // The purpose is to get start offset into each block
     ReadPortion[] readPortions = planReadPortions(groupSize, cellSize,
         offsetIntoBlockGroup, 0, 0);
+
+    final ReaderRetryPolicy retry = new ReaderRetryPolicy();
+    for (int i = 0; i < groupSize; i++) {
+      LocatedBlock targetBlock = targetBlocks[i];
+      if (targetBlock != null) {
+        DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null);
+        if (retval != null) {
+          currentNodes[i] = retval.info;
+          blockReaders[i] = getBlockReaderWithRetry(targetBlock,
+              readPortions[i].startOffsetInBlock,
+              targetBlock.getBlockSize() - readPortions[i].startOffsetInBlock,
+              retval.addr, retval.storageType, retval.info, target, retry);
+        }
+      }
+    }
+  }
+
+  private BlockReader getBlockReaderWithRetry(LocatedBlock targetBlock,
+      long offsetInBlock, long length, InetSocketAddress targetAddr,
+      StorageType storageType, DatanodeInfo datanode, long offsetInFile,
+      ReaderRetryPolicy retry) throws IOException {
+    // only need to get a new access token or a new encryption key once
     while (true) {
-      int i = 0;
-      InetSocketAddress targetAddr = null;
       try {
-        blockReaders = new BlockReader[groupSize];
-        for (i = 0; i < groupSize; i++) {
-          LocatedBlock targetBlock = targetBlocks[i];
-          if (targetBlock == null) {
-            continue;
-          }
-          long offsetIntoBlock = readPortions[i].startOffsetInBlock;
-          DNAddrPair retval = getBestNodeDNAddrPair(targetBlock, null);
-          chosenNodes[i] = retval.info;
-          targetAddr = retval.addr;
-          StorageType storageType = retval.storageType;
-
-          ExtendedBlock blk = targetBlock.getBlock();
-          Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
-          CachingStrategy curCachingStrategy;
-          boolean shortCircuitForbidden;
-          synchronized(infoLock) {
-            curCachingStrategy = cachingStrategy;
-            shortCircuitForbidden = shortCircuitForbidden();
-          }
-          blockReaders[i] = new BlockReaderFactory(dfsClient.getConf()).
-              setInetSocketAddress(targetAddr).
-              setRemotePeerFactory(dfsClient).
-              setDatanodeInfo(chosenNodes[i]).
-              setStorageType(storageType).
-              setFileName(src).
-              setBlock(blk).
-              setBlockToken(accessToken).
-              setStartOffset(offsetIntoBlock).
-              setVerifyChecksum(verifyChecksum).
-              setClientName(dfsClient.clientName).
-              setLength(blk.getNumBytes() - offsetIntoBlock).
-              setCachingStrategy(curCachingStrategy).
-              setAllowShortCircuitLocalReads(!shortCircuitForbidden).
-              setClientCacheContext(dfsClient.getClientContext()).
-              setUserGroupInformation(dfsClient.ugi).
-              setConfiguration(dfsClient.getConfiguration()).
-              build();
-        }
-        currentLocatedBlock = targetBlockGroup;
-        return chosenNodes;
-      } catch (IOException ex) {
-        // Retry in case of encryption key or token exceptions. Otherwise throw
-        // IOException: since each internal block is singly replicated, it's
-        // not meaningful trying to locate another replica.
-        if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
+        return getBlockReader(targetBlock, offsetInBlock, length, targetAddr,
+            storageType, datanode);
+      } catch (IOException e) {
+        if (e instanceof InvalidEncryptionKeyException &&
+            retry.shouldRefetchEncryptionKey()) {
           DFSClient.LOG.info("Will fetch a new encryption key and retry, "
               + "encryption key was invalid when connecting to " + targetAddr
-              + " : " + ex);
-          // The encryption key used is invalid.
-          refetchEncryptionKey--;
+              + " : " + e);
           dfsClient.clearDataEncryptionKey();
-        } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
-          refetchToken--;
-          fetchBlockAt(target);
+          retry.refetchEncryptionKey();
+        } else if (retry.shouldRefetchToken() &&
+            tokenRefetchNeeded(e, targetAddr)) {
+          fetchBlockAt(offsetInFile);
+          retry.refetchToken();
         } else {
           DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
-              + ", add to deadNodes and continue. " + ex, ex);
-          // Put chosen node into dead list and throw exception
-          addToDeadNodes(chosenNodes[i]);
-          throw ex;
+              + ", add to deadNodes and continue.", e);
+          // Put chosen node into dead list, continue
+          addToDeadNodes(datanode);
+          return null;
         }
       }
     }
@@ -272,15 +260,15 @@ public class DFSStripedInputStream extends DFSInputStream {
       return;
     }
     for (int i = 0; i < groupSize; i++) {
-      if (blockReaders[i] == null) {
-        continue;
-      }
-      try {
-        blockReaders[i].close();
-      } catch (IOException e) {
-        DFSClient.LOG.error("error closing blockReader", e);
+      if (blockReaders[i] != null) {
+        try {
+          blockReaders[i].close();
+        } catch (IOException e) {
+          DFSClient.LOG.error("error closing blockReader", e);
+        }
+        blockReaders[i] = null;
       }
-      blockReaders[i] = null;
+      currentNodes[i] = null;
     }
     blockEnd = -1;
   }
@@ -292,123 +280,93 @@ public class DFSStripedInputStream extends DFSInputStream {
     if (closed.get()) {
       throw new IOException("Stream closed");
     }
-    Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap
-        = new HashMap<>();
+    Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<>();
     failures = 0;
     if (pos < getFileLength()) {
-      int retries = 2;
       /** Index of the target block in a stripe to read from */
       int idxInGroup = (int) ((pos / cellSize) % dataBlkNum);
-      while (retries > 0) {
-        try {
-          // currentNode can be left as null if previous read had a checksum
-          // error on the same block. See HDFS-3067
-          if (pos > blockEnd || currentNodes == null) {
-            currentNodes = blockSeekTo(pos);
-          }
-          int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
-          synchronized(infoLock) {
-            if (locatedBlocks.isLastBlockComplete()) {
-              realLen = (int) Math.min(realLen,
-                  locatedBlocks.getFileLength() - pos);
-            }
+      try {
+        if (pos > blockEnd) {
+          blockSeekTo(pos);
+        }
+        int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
+        synchronized (infoLock) {
+          if (locatedBlocks.isLastBlockComplete()) {
+            realLen = (int) Math.min(realLen,
+                locatedBlocks.getFileLength() - pos);
           }
+        }
 
-          /** Number of bytes already read into buffer */
-          int result = 0;
-          while (result < realLen) {
-            /**
-             * Temporary position into the file; {@link pos} might not proceed
-             * to this temporary position in case of exceptions.
-             */
-            long tmpPos = pos + result;
-            /** Start and end offsets of a cell in the file */
-            long cellStart = (tmpPos / cellSize) * cellSize;
-            long cellEnd = cellStart + cellSize - 1;
-
-            /** Number of bytes to read from the current cell */
-            int realLenInCell = (int) Math.min(realLen - result,
-                cellEnd - tmpPos + 1L);
-            assert realLenInCell > 0 : "Temporary position shouldn't be " +
-                "after cellEnd";
-            // Read from one blockReader up to cell boundary
-            int cellRet = readBuffer(blockReaders[idxInGroup],
-                currentNodes[idxInGroup], strategy, off + result,
-                realLenInCell);
-            if (cellRet >= 0) {
-              result += cellRet;
-              if (cellRet < realLenInCell) {
-                // A short read indicates the current blockReader buffer is
-                // already drained. Should return the read call. Otherwise
-                // should proceed to the next cell.
-                break;
-              }
-            } else {
-              // got a EOS from reader though we expect more data on it.
-              throw new IOException("Unexpected EOS from the reader");
+        /** Number of bytes already read into buffer */
+        int result = 0;
+        while (result < realLen) {
+          /**
+           * Temporary position into the file; {@link pos} might not proceed
+           * to this temporary position in case of exceptions.
+           */
+          long tmpPos = pos + result;
+          /** Start and end offsets of a cell in the file */
+          long cellStart = (tmpPos / cellSize) * cellSize;
+          long cellEnd = cellStart + cellSize - 1;
+
+          /** Number of bytes to read from the current cell */
+          int realLenInCell = (int) Math.min(realLen - result,
+              cellEnd - tmpPos + 1L);
+          assert realLenInCell > 0 : "Temporary position shouldn't be "
+              + "after cellEnd";
+
+          // Read from one blockReader up to cell boundary
+          int cellRet = readBuffer(blockReaders[idxInGroup],
+              currentNodes[idxInGroup], strategy, off + result, realLenInCell,
+              corruptedBlockMap);
+          if (cellRet >= 0) {
+            result += cellRet;
+            if (cellRet < realLenInCell) {
+              // A short read indicates the current blockReader buffer is
+              // already drained. Should return the read call. Otherwise
+              // should proceed to the next cell.
+              break;
             }
-            idxInGroup = (idxInGroup + 1) % dataBlkNum;
-          }
-
-          pos += result;
-
-          if (dfsClient.stats != null) {
-            dfsClient.stats.incrementBytesRead(result);
-          }
-          return result;
-        } catch (ChecksumException ce) {
-          throw ce;
-        } catch (IOException e) {
-          if (retries == 1) {
-            DFSClient.LOG.warn("DFS Read", e);
-          }
-          blockEnd = -1;
-          if (currentNodes[idxInGroup] != null) {
-            addToDeadNodes(currentNodes[idxInGroup]);
+          } else {
+            // got a EOS from reader though we expect more data on it.
+            throw new IOException("Unexpected EOS from the reader");
           }
-          if (--retries == 0) {
-            throw e;
-          }
-        } finally {
-          // Check if need to report block replicas corruption either read
-          // was successful or ChecksumException occured.
-          reportCheckSumFailure(corruptedBlockMap,
-              currentLocatedBlock.getLocations().length);
+          idxInGroup = (idxInGroup + 1) % dataBlkNum;
         }
+        pos += result;
+        if (dfsClient.stats != null) {
+          dfsClient.stats.incrementBytesRead(result);
+        }
+        return result;
+      } finally {
+        // Check if need to report block replicas corruption either read
+        // was successful or ChecksumException occured.
+        reportCheckSumFailure(corruptedBlockMap,
+            currentLocatedBlock.getLocations().length);
       }
     }
     return -1;
   }
 
   private synchronized int readBuffer(BlockReader blockReader,
-      DatanodeInfo currentNode, ReaderStrategy readerStrategy, int off, int len)
-      throws IOException {
-    IOException ioe;
-    while (true) {
-      try {
-        return readerStrategy.doRead(blockReader, off, len);
-      } catch ( ChecksumException ce ) {
-        DFSClient.LOG.warn("Found Checksum error for "
-            + getCurrentBlock() + " from " + currentNode
-            + " at " + ce.getPos());
-        // If current block group is corrupt, it's meaningless to retry.
-        // TODO: this should trigger decoding logic (HDFS-7678)
-        throw ce;
-      } catch ( IOException e ) {
-        ioe = e;
-      }
-
-      boolean sourceFound = seekToBlockSource(pos);
-      if (!sourceFound) {
-        throw ioe;
-      }
+      DatanodeInfo currentNode, ReaderStrategy readerStrategy, int off, int len,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+    try {
+      return readerStrategy.doRead(blockReader, off, len);
+    } catch ( ChecksumException ce ) {
+      DFSClient.LOG.warn("Found Checksum error for "
+          + getCurrentBlock() + " from " + currentNode
+          + " at " + ce.getPos());
+      // we want to remember which block replicas we have tried
+      addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
+          corruptedBlockMap);
+    } catch (IOException e) {
+      DFSClient.LOG.warn("Exception while reading from "
+          + getCurrentBlock() + " of " + src + " from "
+          + currentNode, e);
     }
-  }
-
-  private boolean seekToBlockSource(long targetPos)
-      throws IOException {
-    currentNodes = blockSeekTo(targetPos);
-    return true;
+    // TODO: this should trigger decoding logic (HDFS-7678)
+    return -1;
   }
 
   protected class ByteBufferStrategy extends DFSInputStream.ByteBufferStrategy {
@@ -418,7 +376,7 @@ public class DFSStripedInputStream extends DFSInputStream {
 
     @Override
     public int doRead(BlockReader blockReader, int off, int len)
-        throws ChecksumException, IOException {
+        throws IOException {
       int oldlimit = buf.limit();
       if (buf.remaining() > len) {
         buf.limit(buf.position() + len);