You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/07/02 12:42:45 UTC

[1/2] hadoop git commit: HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch (Contributed by Vinayakumar B)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 e1ca88641 -> 55bf014a0
  refs/heads/trunk 37d739577 -> bff5999d0


HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch (Contributed by Vinayakumar B)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bff5999d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bff5999d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bff5999d

Branch: refs/heads/trunk
Commit: bff5999d07e9416a22846c849487e509ede55040
Parents: 37d7395
Author: Vinayakumar B <vi...@apache.org>
Authored: Thu Jul 2 16:11:50 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Thu Jul 2 16:11:50 2015 +0530

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |   2 +-
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 233 +++++++++++++------
 .../hadoop/hdfs/TestDFSClientRetries.java       |   2 +-
 4 files changed, 169 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff5999d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index ec37542..7b96c56 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -696,6 +696,9 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-8666. Speedup the TestMover tests. (Walter Su via jing9)
 
+    HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch
+    (vinayakumarb)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff5999d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index f4ceab3..4923a50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -1181,7 +1181,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     //    Get block info from namenode
     TraceScope scope = getPathTraceScope("newDFSInputStream", src);
     try {
-      return new DFSInputStream(this, src, verifyChecksum);
+      return new DFSInputStream(this, src, verifyChecksum, null);
     } finally {
       scope.close();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff5999d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 6563d7b..7f3722f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ByteBufferReadable;
@@ -94,35 +95,35 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   @VisibleForTesting
   public static boolean tcpReadsDisabledForTesting = false;
   private long hedgedReadOpsLoopNumForTesting = 0;
-  private final DFSClient dfsClient;
-  private AtomicBoolean closed = new AtomicBoolean(false);
-  private final String src;
-  private final boolean verifyChecksum;
+  protected final DFSClient dfsClient;
+  protected AtomicBoolean closed = new AtomicBoolean(false);
+  protected final String src;
+  protected final boolean verifyChecksum;
 
   // state by stateful read only:
   // (protected by lock on this)
   /////
   private DatanodeInfo currentNode = null;
-  private LocatedBlock currentLocatedBlock = null;
-  private long pos = 0;
-  private long blockEnd = -1;
+  protected LocatedBlock currentLocatedBlock = null;
+  protected long pos = 0;
+  protected long blockEnd = -1;
   private BlockReader blockReader = null;
   ////
 
   // state shared by stateful and positional read:
   // (protected by lock on infoLock)
   ////
-  private LocatedBlocks locatedBlocks = null;
+  protected LocatedBlocks locatedBlocks = null;
   private long lastBlockBeingWrittenLength = 0;
   private FileEncryptionInfo fileEncryptionInfo = null;
-  private CachingStrategy cachingStrategy;
+  protected CachingStrategy cachingStrategy;
   ////
 
-  private final ReadStatistics readStatistics = new ReadStatistics();
+  protected final ReadStatistics readStatistics = new ReadStatistics();
   // lock for state shared between read and pread
   // Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
   //       (it's OK to acquire this lock when the lock on <this> is held)
-  private final Object infoLock = new Object();
+  protected final Object infoLock = new Object();
 
   /**
    * Track the ByteBuffers that we have handed out to readers.
@@ -239,7 +240,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * back to the namenode to get a new list of block locations, and is
    * capped at maxBlockAcquireFailures
    */
-  private int failures = 0;
+  protected int failures = 0;
 
   /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
    * parallel accesses to DFSInputStream (through ptreads) properly */
@@ -252,24 +253,28 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     deadNodes.put(dnInfo, dnInfo);
   }
   
-  DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
-                 ) throws IOException, UnresolvedLinkException {
+  DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
+      LocatedBlocks locatedBlocks) throws IOException, UnresolvedLinkException {
     this.dfsClient = dfsClient;
     this.verifyChecksum = verifyChecksum;
     this.src = src;
     synchronized (infoLock) {
       this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
     }
-    openInfo();
+    this.locatedBlocks = locatedBlocks;
+    openInfo(false);
   }
 
   /**
    * Grab the open-file info from namenode
+   * @param refreshLocatedBlocks whether to re-fetch locatedblocks
    */
-  void openInfo() throws IOException, UnresolvedLinkException {
+  void openInfo(boolean refreshLocatedBlocks) throws IOException,
+      UnresolvedLinkException {
     final DfsClientConf conf = dfsClient.getConf();
     synchronized(infoLock) {
-      lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
+      lastBlockBeingWrittenLength =
+          fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
       int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
       while (retriesForLastBlockLength > 0) {
         // Getting last block length as -1 is a special case. When cluster
@@ -281,7 +286,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
               + "Datanodes might not have reported blocks completely."
               + " Will retry for " + retriesForLastBlockLength + " times");
           waitFor(conf.getRetryIntervalForGetLastBlockLength());
-          lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
+          lastBlockBeingWrittenLength =
+              fetchLocatedBlocksAndGetLastBlockLength(true);
         } else {
           break;
         }
@@ -302,8 +308,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
   }
 
-  private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
-    final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
+  private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
+      throws IOException {
+    LocatedBlocks newInfo = locatedBlocks;
+    if (locatedBlocks == null || refresh) {
+      newInfo = dfsClient.getLocatedBlocks(src, 0);
+    }
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("newInfo = " + newInfo);
     }
@@ -441,7 +451,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @return located block
    * @throws IOException
    */
-  private LocatedBlock getBlockAt(long offset) throws IOException {
+  protected LocatedBlock getBlockAt(long offset) throws IOException {
     synchronized(infoLock) {
       assert (locatedBlocks != null) : "locatedBlocks is null";
 
@@ -476,7 +486,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   /** Fetch a block from namenode and cache it */
-  private void fetchBlockAt(long offset) throws IOException {
+  protected void fetchBlockAt(long offset) throws IOException {
     synchronized(infoLock) {
       int targetBlockIdx = locatedBlocks.findBlock(offset);
       if (targetBlockIdx < 0) { // block is not cached
@@ -579,7 +589,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
 
     // Will be getting a new BlockReader.
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
 
     //
     // Connect to best DataNode for desired Block, with potential offset
@@ -620,7 +630,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         return chosenNode;
       } catch (IOException ex) {
         if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
-          DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
+          DFSClient.LOG.info("Will fetch a new encryption key and retry, "
               + "encryption key was invalid when connecting to " + targetAddr
               + " : " + ex);
           // The encryption key used is invalid.
@@ -631,8 +641,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           fetchBlockAt(target);
         } else {
           connectFailedOnce = true;
-          DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block "
-            +targetBlock.getBlock()+ ", add to deadNodes and continue. " + ex, ex);
+          DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+            + ", add to deadNodes and continue. " + ex, ex);
           // Put chosen node into dead list, continue
           addToDeadNodes(chosenNode);
         }
@@ -696,7 +706,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           "unreleased ByteBuffers allocated by read().  " +
           "Please release " + builder.toString() + ".");
     }
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
     super.close();
   }
 
@@ -713,12 +723,22 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * Wraps different possible read implementations so that readBuffer can be
    * strategy-agnostic.
    */
-  private interface ReaderStrategy {
+  interface ReaderStrategy {
     public int doRead(BlockReader blockReader, int off, int len)
         throws ChecksumException, IOException;
+
+    /**
+     * Copy data from the src ByteBuffer into the read buffer.
+     * @param src The src buffer where the data is copied from
+     * @param offset Useful only when the ReadStrategy is based on a byte array.
+     *               Indicate the offset of the byte array for copy.
+     * @param length Useful only when the ReadStrategy is based on a byte array.
+     *               Indicate the length of the data to copy.
+     */
+    public int copyFrom(ByteBuffer src, int offset, int length);
   }
 
-  private void updateReadStatistics(ReadStatistics readStatistics, 
+  protected void updateReadStatistics(ReadStatistics readStatistics,
         int nRead, BlockReader blockReader) {
     if (nRead <= 0) return;
     synchronized(infoLock) {
@@ -749,12 +769,19 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       updateReadStatistics(readStatistics, nRead, blockReader);
       return nRead;
     }
+
+    @Override
+    public int copyFrom(ByteBuffer src, int offset, int length) {
+      ByteBuffer writeSlice = src.duplicate();
+      writeSlice.get(buf, offset, length);
+      return length;
+    }
   }
 
   /**
    * Used to read bytes into a user-supplied ByteBuffer
    */
-  private class ByteBufferStrategy implements ReaderStrategy {
+  protected class ByteBufferStrategy implements ReaderStrategy {
     final ByteBuffer buf;
     ByteBufferStrategy(ByteBuffer buf) {
       this.buf = buf;
@@ -770,6 +797,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         int ret = blockReader.read(buf);
         success = true;
         updateReadStatistics(readStatistics, ret, blockReader);
+        if (ret == 0) {
+          DFSClient.LOG.warn("zero");
+        }
         return ret;
       } finally {
         if (!success) {
@@ -779,6 +809,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         }
       } 
     }
+
+    @Override
+    public int copyFrom(ByteBuffer src, int offset, int length) {
+      ByteBuffer writeSlice = src.duplicate();
+      int remaining = Math.min(buf.remaining(), writeSlice.remaining());
+      writeSlice.limit(writeSlice.position() + remaining);
+      buf.put(writeSlice);
+      return remaining;
+    }
   }
 
   /* This is a used by regular read() and handles ChecksumExceptions.
@@ -837,7 +876,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
   }
 
-  private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
+  protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
     dfsClient.checkOpen();
     if (closed.get()) {
       throw new IOException("Stream closed");
@@ -926,7 +965,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   /**
    * Add corrupted block replica into map.
    */
-  private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, 
+  protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
     Set<DatanodeInfo> dnSet = null;
     if((corruptedBlockMap.containsKey(blk))) {
@@ -985,8 +1024,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         } catch (InterruptedException iex) {
         }
         deadNodes.clear(); //2nd option is to remove only nodes[blockId]
-        openInfo();
-        block = getBlockAt(block.getStartOffset());
+        openInfo(true);
+        block = refreshLocatedBlock(block);
         failures++;
       }
     }
@@ -998,7 +1037,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @param ignoredNodes Do not choose nodes in this array (may be null)
    * @return The DNAddrPair of the best node. Null if no node can be chosen.
    */
-  private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
+  protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
       Collection<DatanodeInfo> ignoredNodes) {
     DatanodeInfo[] nodes = block.getLocations();
     StorageType[] storageTypes = block.getStorageTypes();
@@ -1058,15 +1097,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     return errMsgr.toString();
   }
 
-  private void fetchBlockByteRange(long blockStartOffset, long start, long end,
+  protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
       byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
-    LocatedBlock block = getBlockAt(blockStartOffset);
+    block = refreshLocatedBlock(block);
     while (true) {
       DNAddrPair addressPair = chooseDataNode(block, null);
       try {
-        actualGetFromOneDataNode(addressPair, blockStartOffset, start, end,
+        actualGetFromOneDataNode(addressPair, block, start, end,
             buf, offset, corruptedBlockMap);
         return;
       } catch (IOException e) {
@@ -1077,7 +1116,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
-      final long blockStartOffset, final long start, final long end,
+      final LocatedBlock block, final long start, final long end,
       final ByteBuffer bb,
       final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
       final int hedgedReadId) {
@@ -1090,7 +1129,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         TraceScope scope =
             Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
         try {
-          actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf,
+          actualGetFromOneDataNode(datanode, block, start, end, buf,
               offset, corruptedBlockMap);
           return bb;
         } finally {
@@ -1100,31 +1139,60 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     };
   }
 
+  /**
+   * Used when reading contiguous blocks
+   */
   private void actualGetFromOneDataNode(final DNAddrPair datanode,
-      long blockStartOffset, final long start, final long end, byte[] buf,
+      LocatedBlock block, final long start, final long end, byte[] buf,
       int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
+    final int length = (int) (end - start + 1);
+    actualGetFromOneDataNode(datanode, block, start, end, buf,
+        new int[]{offset}, new int[]{length}, corruptedBlockMap);
+  }
+
+  /**
+   * Read data from one DataNode.
+   * @param datanode the datanode from which to read data
+   * @param block the located block containing the requested data
+   * @param startInBlk the startInBlk offset of the block
+   * @param endInBlk the endInBlk offset of the block
+   * @param buf the given byte array into which the data is read
+   * @param offsets the data may be read into multiple segments of the buf
+   *                (when reading a striped block). this array indicates the
+   *                offset of each buf segment.
+   * @param lengths the length of each buf segment
+   * @param corruptedBlockMap map recording list of datanodes with corrupted
+   *                          block replica
+   */
+  void actualGetFromOneDataNode(final DNAddrPair datanode,
+      LocatedBlock block, final long startInBlk, final long endInBlk,
+      byte[] buf, int[] offsets, int[] lengths,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      throws IOException {
     DFSClientFaultInjector.get().startFetchFromDatanode();
     int refetchToken = 1; // only need to get a new access token once
     int refetchEncryptionKey = 1; // only need to get a new encryption key once
+    final int len = (int) (endInBlk - startInBlk + 1);
+    checkReadPortions(offsets, lengths, len);
 
     while (true) {
       // cached block locations may have been updated by chooseDataNode()
       // or fetchBlockAt(). Always get the latest list of locations at the
       // start of the loop.
-      LocatedBlock block = getBlockAt(blockStartOffset);
+      block = refreshLocatedBlock(block);
       BlockReader reader = null;
       try {
         DFSClientFaultInjector.get().fetchFromDatanodeException();
-        int len = (int) (end - start + 1);
-        reader = getBlockReader(block, start, len, datanode.addr,
+        reader = getBlockReader(block, startInBlk, len, datanode.addr,
             datanode.storageType, datanode.info);
-        int nread = reader.readAll(buf, offset, len);
-        updateReadStatistics(readStatistics, nread, reader);
-
-        if (nread != len) {
-          throw new IOException("truncated return from reader.read(): " +
-                                "excpected " + len + ", got " + nread);
+        for (int i = 0; i < offsets.length; i++) {
+          int nread = reader.readAll(buf, offsets[i], lengths[i]);
+          updateReadStatistics(readStatistics, nread, reader);
+          if (nread != lengths[i]) {
+            throw new IOException("truncated return from reader.read(): " +
+                "excpected " + lengths[i] + ", got " + nread);
+          }
         }
         DFSClientFaultInjector.get().readFromDatanodeDelay();
         return;
@@ -1169,11 +1237,40 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   /**
-   * Like {@link #fetchBlockByteRange} except we start up a second, parallel,
+   * Refresh cached block locations.
+   * @param block The currently cached block locations
+   * @return Refreshed block locations
+   * @throws IOException
+   */
+  protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
+      throws IOException {
+    return getBlockAt(block.getStartOffset());
+  }
+
+  /**
+   * This method verifies that the read portions are valid and do not overlap
+   * with each other.
+   */
+  private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
+    Preconditions.checkArgument(offsets.length == lengths.length && offsets.length > 0);
+    int sum = 0;
+    for (int i = 0; i < lengths.length; i++) {
+      if (i > 0) {
+        int gap = offsets[i] - offsets[i - 1];
+        // make sure read portions do not overlap with each other
+        Preconditions.checkArgument(gap >= lengths[i - 1]);
+      }
+      sum += lengths[i];
+    }
+    Preconditions.checkArgument(sum == totalLen);
+  }
+
+  /**
+   * Like {@link #fetchBlockByteRange}except we start up a second, parallel,
    * 'hedged' read if the first read is taking longer than configured amount of
    * time. We then wait on which ever read returns first.
    */
-  private void hedgedFetchBlockByteRange(long blockStartOffset, long start,
+  private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
       long end, byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
@@ -1186,7 +1283,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     ByteBuffer bb = null;
     int len = (int) (end - start + 1);
     int hedgedReadId = 0;
-    LocatedBlock block = getBlockAt(blockStartOffset);
+    block = refreshLocatedBlock(block);
     while (true) {
       // see HDFS-6591, this metric is used to verify/catch unnecessary loops
       hedgedReadOpsLoopNumForTesting++;
@@ -1198,7 +1295,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         chosenNode = chooseDataNode(block, ignored);
         bb = ByteBuffer.wrap(buf, offset, len);
         Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
-            chosenNode, block.getStartOffset(), start, end, bb,
+            chosenNode, block, start, end, bb,
             corruptedBlockMap, hedgedReadId++);
         Future<ByteBuffer> firstRequest = hedgedService
             .submit(getFromDataNodeCallable);
@@ -1235,7 +1332,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           }
           bb = ByteBuffer.allocate(len);
           Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
-              chosenNode, block.getStartOffset(), start, end, bb,
+              chosenNode, block, start, end, bb,
               corruptedBlockMap, hedgedReadId++);
           Future<ByteBuffer> oneMoreRequest = hedgedService
               .submit(getFromDataNodeCallable);
@@ -1319,7 +1416,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @return true if block access token has expired or invalid and it should be
    *         refetched
    */
-  private static boolean tokenRefetchNeeded(IOException ex,
+  protected static boolean tokenRefetchNeeded(IOException ex,
       InetSocketAddress targetAddr) {
     /*
      * Get a new access token and retry. Retry is needed in 2 cases. 1)
@@ -1389,13 +1486,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
       try {
         if (dfsClient.isHedgedReadsEnabled()) {
-          hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart,
-              targetStart + bytesToRead - 1, buffer, offset,
-              corruptedBlockMap);
+          hedgedFetchBlockByteRange(blk, targetStart,
+              targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
         } else {
-          fetchBlockByteRange(blk.getStartOffset(), targetStart,
-              targetStart + bytesToRead - 1, buffer, offset,
-              corruptedBlockMap);
+          fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
+              buffer, offset, corruptedBlockMap);
         }
       } finally {
         // Check and report if any block replicas are corrupted.
@@ -1427,7 +1522,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @param corruptedBlockMap map of corrupted blocks
    * @param dataNodeCount number of data nodes who contains the block replicas
    */
-  private void reportCheckSumFailure(
+  protected void reportCheckSumFailure(
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, 
       int dataNodeCount) {
     if (corruptedBlockMap.isEmpty()) {
@@ -1556,7 +1651,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   /**
    */
   @Override
-  public synchronized long getPos() throws IOException {
+  public synchronized long getPos() {
     return pos;
   }
 
@@ -1590,7 +1685,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   /** Utility class to encapsulate data node info and its address. */
-  private static final class DNAddrPair {
+  static final class DNAddrPair {
     final DatanodeInfo info;
     final InetSocketAddress addr;
     final StorageType storageType;
@@ -1627,7 +1722,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
   }
 
-  private void closeCurrentBlockReader() {
+  protected void closeCurrentBlockReaders() {
     if (blockReader == null) return;
     // Close the current block reader so that the new caching settings can 
     // take effect immediately.
@@ -1647,7 +1742,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       this.cachingStrategy =
           new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
     }
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
   }
 
   @Override
@@ -1657,7 +1752,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       this.cachingStrategy =
           new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
     }
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
   }
 
   /**
@@ -1815,6 +1910,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
 
   @Override
   public synchronized void unbuffer() {
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff5999d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
index 68cc155..43e0eb9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
@@ -343,7 +343,7 @@ public class TestDFSClientRetries {
       // we're starting a new operation on the user level.
       doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
         .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
-      is.openInfo();
+      is.openInfo(true);
       // Seek to beginning forces a reopen of the BlockReader - otherwise it'll
       // just keep reading on the existing stream and the fact that we've poisoned
       // the block info won't do anything.


[2/2] hadoop git commit: HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch (Contributed by Vinayakumar B)

Posted by vi...@apache.org.
HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch (Contributed by Vinayakumar B)

(cherry picked from commit bff5999d07e9416a22846c849487e509ede55040)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/55bf014a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/55bf014a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/55bf014a

Branch: refs/heads/branch-2
Commit: 55bf014a07160bf4669b3a1ac7ff925faec4982e
Parents: e1ca886
Author: Vinayakumar B <vi...@apache.org>
Authored: Thu Jul 2 16:11:50 2015 +0530
Committer: Vinayakumar B <vi...@apache.org>
Committed: Thu Jul 2 16:12:13 2015 +0530

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |   2 +-
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 233 +++++++++++++------
 .../hadoop/hdfs/TestDFSClientRetries.java       |   2 +-
 4 files changed, 169 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/55bf014a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 1fe1be3..2858372 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -357,6 +357,9 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-8666. Speedup the TestMover tests. (Walter Su via jing9)
 
+    HDFS-8703. Merge refactor of DFSInputStream from ErasureCoding branch
+    (vinayakumarb)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55bf014a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 46a1235..922c065 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -1178,7 +1178,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     //    Get block info from namenode
     TraceScope scope = getPathTraceScope("newDFSInputStream", src);
     try {
-      return new DFSInputStream(this, src, verifyChecksum);
+      return new DFSInputStream(this, src, verifyChecksum, null);
     } finally {
       scope.close();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55bf014a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 6563d7b..7f3722f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ByteBufferReadable;
@@ -94,35 +95,35 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   @VisibleForTesting
   public static boolean tcpReadsDisabledForTesting = false;
   private long hedgedReadOpsLoopNumForTesting = 0;
-  private final DFSClient dfsClient;
-  private AtomicBoolean closed = new AtomicBoolean(false);
-  private final String src;
-  private final boolean verifyChecksum;
+  protected final DFSClient dfsClient;
+  protected AtomicBoolean closed = new AtomicBoolean(false);
+  protected final String src;
+  protected final boolean verifyChecksum;
 
   // state by stateful read only:
   // (protected by lock on this)
   /////
   private DatanodeInfo currentNode = null;
-  private LocatedBlock currentLocatedBlock = null;
-  private long pos = 0;
-  private long blockEnd = -1;
+  protected LocatedBlock currentLocatedBlock = null;
+  protected long pos = 0;
+  protected long blockEnd = -1;
   private BlockReader blockReader = null;
   ////
 
   // state shared by stateful and positional read:
   // (protected by lock on infoLock)
   ////
-  private LocatedBlocks locatedBlocks = null;
+  protected LocatedBlocks locatedBlocks = null;
   private long lastBlockBeingWrittenLength = 0;
   private FileEncryptionInfo fileEncryptionInfo = null;
-  private CachingStrategy cachingStrategy;
+  protected CachingStrategy cachingStrategy;
   ////
 
-  private final ReadStatistics readStatistics = new ReadStatistics();
+  protected final ReadStatistics readStatistics = new ReadStatistics();
   // lock for state shared between read and pread
   // Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
   //       (it's OK to acquire this lock when the lock on <this> is held)
-  private final Object infoLock = new Object();
+  protected final Object infoLock = new Object();
 
   /**
    * Track the ByteBuffers that we have handed out to readers.
@@ -239,7 +240,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * back to the namenode to get a new list of block locations, and is
    * capped at maxBlockAcquireFailures
    */
-  private int failures = 0;
+  protected int failures = 0;
 
   /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
    * parallel accesses to DFSInputStream (through ptreads) properly */
@@ -252,24 +253,28 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     deadNodes.put(dnInfo, dnInfo);
   }
   
-  DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
-                 ) throws IOException, UnresolvedLinkException {
+  DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
+      LocatedBlocks locatedBlocks) throws IOException, UnresolvedLinkException {
     this.dfsClient = dfsClient;
     this.verifyChecksum = verifyChecksum;
     this.src = src;
     synchronized (infoLock) {
       this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
     }
-    openInfo();
+    this.locatedBlocks = locatedBlocks;
+    openInfo(false);
   }
 
   /**
    * Grab the open-file info from namenode
+   * @param refreshLocatedBlocks whether to re-fetch locatedblocks
    */
-  void openInfo() throws IOException, UnresolvedLinkException {
+  void openInfo(boolean refreshLocatedBlocks) throws IOException,
+      UnresolvedLinkException {
     final DfsClientConf conf = dfsClient.getConf();
     synchronized(infoLock) {
-      lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
+      lastBlockBeingWrittenLength =
+          fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
       int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
       while (retriesForLastBlockLength > 0) {
         // Getting last block length as -1 is a special case. When cluster
@@ -281,7 +286,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
               + "Datanodes might not have reported blocks completely."
               + " Will retry for " + retriesForLastBlockLength + " times");
           waitFor(conf.getRetryIntervalForGetLastBlockLength());
-          lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
+          lastBlockBeingWrittenLength =
+              fetchLocatedBlocksAndGetLastBlockLength(true);
         } else {
           break;
         }
@@ -302,8 +308,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
   }
 
-  private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
-    final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
+  private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
+      throws IOException {
+    LocatedBlocks newInfo = locatedBlocks;
+    if (locatedBlocks == null || refresh) {
+      newInfo = dfsClient.getLocatedBlocks(src, 0);
+    }
     if (DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug("newInfo = " + newInfo);
     }
@@ -441,7 +451,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @return located block
    * @throws IOException
    */
-  private LocatedBlock getBlockAt(long offset) throws IOException {
+  protected LocatedBlock getBlockAt(long offset) throws IOException {
     synchronized(infoLock) {
       assert (locatedBlocks != null) : "locatedBlocks is null";
 
@@ -476,7 +486,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   /** Fetch a block from namenode and cache it */
-  private void fetchBlockAt(long offset) throws IOException {
+  protected void fetchBlockAt(long offset) throws IOException {
     synchronized(infoLock) {
       int targetBlockIdx = locatedBlocks.findBlock(offset);
       if (targetBlockIdx < 0) { // block is not cached
@@ -579,7 +589,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
 
     // Will be getting a new BlockReader.
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
 
     //
     // Connect to best DataNode for desired Block, with potential offset
@@ -620,7 +630,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         return chosenNode;
       } catch (IOException ex) {
         if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
-          DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
+          DFSClient.LOG.info("Will fetch a new encryption key and retry, "
               + "encryption key was invalid when connecting to " + targetAddr
               + " : " + ex);
           // The encryption key used is invalid.
@@ -631,8 +641,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           fetchBlockAt(target);
         } else {
           connectFailedOnce = true;
-          DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block "
-            +targetBlock.getBlock()+ ", add to deadNodes and continue. " + ex, ex);
+          DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+            + ", add to deadNodes and continue. " + ex, ex);
           // Put chosen node into dead list, continue
           addToDeadNodes(chosenNode);
         }
@@ -696,7 +706,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           "unreleased ByteBuffers allocated by read().  " +
           "Please release " + builder.toString() + ".");
     }
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
     super.close();
   }
 
@@ -713,12 +723,22 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * Wraps different possible read implementations so that readBuffer can be
    * strategy-agnostic.
    */
-  private interface ReaderStrategy {
+  interface ReaderStrategy {
     public int doRead(BlockReader blockReader, int off, int len)
         throws ChecksumException, IOException;
+
+    /**
+     * Copy data from the src ByteBuffer into the read buffer.
+     * @param src The src buffer where the data is copied from
+     * @param offset Useful only when the ReadStrategy is based on a byte array.
+     *               Indicate the offset of the byte array for copy.
+     * @param length Useful only when the ReadStrategy is based on a byte array.
+     *               Indicate the length of the data to copy.
+     */
+    public int copyFrom(ByteBuffer src, int offset, int length);
   }
 
-  private void updateReadStatistics(ReadStatistics readStatistics, 
+  protected void updateReadStatistics(ReadStatistics readStatistics,
         int nRead, BlockReader blockReader) {
     if (nRead <= 0) return;
     synchronized(infoLock) {
@@ -749,12 +769,19 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       updateReadStatistics(readStatistics, nRead, blockReader);
       return nRead;
     }
+
+    @Override
+    public int copyFrom(ByteBuffer src, int offset, int length) {
+      ByteBuffer writeSlice = src.duplicate();
+      writeSlice.get(buf, offset, length);
+      return length;
+    }
   }
 
   /**
    * Used to read bytes into a user-supplied ByteBuffer
    */
-  private class ByteBufferStrategy implements ReaderStrategy {
+  protected class ByteBufferStrategy implements ReaderStrategy {
     final ByteBuffer buf;
     ByteBufferStrategy(ByteBuffer buf) {
       this.buf = buf;
@@ -770,6 +797,9 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         int ret = blockReader.read(buf);
         success = true;
         updateReadStatistics(readStatistics, ret, blockReader);
+        if (ret == 0) {
+          DFSClient.LOG.warn("zero");
+        }
         return ret;
       } finally {
         if (!success) {
@@ -779,6 +809,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         }
       } 
     }
+
+    @Override
+    public int copyFrom(ByteBuffer src, int offset, int length) {
+      ByteBuffer writeSlice = src.duplicate();
+      int remaining = Math.min(buf.remaining(), writeSlice.remaining());
+      writeSlice.limit(writeSlice.position() + remaining);
+      buf.put(writeSlice);
+      return remaining;
+    }
   }
 
   /* This is a used by regular read() and handles ChecksumExceptions.
@@ -837,7 +876,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
   }
 
-  private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
+  protected synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
     dfsClient.checkOpen();
     if (closed.get()) {
       throw new IOException("Stream closed");
@@ -926,7 +965,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   /**
    * Add corrupted block replica into map.
    */
-  private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, 
+  protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
     Set<DatanodeInfo> dnSet = null;
     if((corruptedBlockMap.containsKey(blk))) {
@@ -985,8 +1024,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         } catch (InterruptedException iex) {
         }
         deadNodes.clear(); //2nd option is to remove only nodes[blockId]
-        openInfo();
-        block = getBlockAt(block.getStartOffset());
+        openInfo(true);
+        block = refreshLocatedBlock(block);
         failures++;
       }
     }
@@ -998,7 +1037,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @param ignoredNodes Do not choose nodes in this array (may be null)
    * @return The DNAddrPair of the best node. Null if no node can be chosen.
    */
-  private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
+  protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
       Collection<DatanodeInfo> ignoredNodes) {
     DatanodeInfo[] nodes = block.getLocations();
     StorageType[] storageTypes = block.getStorageTypes();
@@ -1058,15 +1097,15 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     return errMsgr.toString();
   }
 
-  private void fetchBlockByteRange(long blockStartOffset, long start, long end,
+  protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
       byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
-    LocatedBlock block = getBlockAt(blockStartOffset);
+    block = refreshLocatedBlock(block);
     while (true) {
       DNAddrPair addressPair = chooseDataNode(block, null);
       try {
-        actualGetFromOneDataNode(addressPair, blockStartOffset, start, end,
+        actualGetFromOneDataNode(addressPair, block, start, end,
             buf, offset, corruptedBlockMap);
         return;
       } catch (IOException e) {
@@ -1077,7 +1116,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
-      final long blockStartOffset, final long start, final long end,
+      final LocatedBlock block, final long start, final long end,
       final ByteBuffer bb,
       final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
       final int hedgedReadId) {
@@ -1090,7 +1129,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         TraceScope scope =
             Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
         try {
-          actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf,
+          actualGetFromOneDataNode(datanode, block, start, end, buf,
               offset, corruptedBlockMap);
           return bb;
         } finally {
@@ -1100,31 +1139,60 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     };
   }
 
+  /**
+   * Used when reading contiguous blocks
+   */
   private void actualGetFromOneDataNode(final DNAddrPair datanode,
-      long blockStartOffset, final long start, final long end, byte[] buf,
+      LocatedBlock block, final long start, final long end, byte[] buf,
       int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
+    final int length = (int) (end - start + 1);
+    actualGetFromOneDataNode(datanode, block, start, end, buf,
+        new int[]{offset}, new int[]{length}, corruptedBlockMap);
+  }
+
+  /**
+   * Read data from one DataNode.
+   * @param datanode the datanode from which to read data
+   * @param block the located block containing the requested data
+   * @param startInBlk the startInBlk offset of the block
+   * @param endInBlk the endInBlk offset of the block
+   * @param buf the given byte array into which the data is read
+   * @param offsets the data may be read into multiple segments of the buf
+   *                (when reading a striped block). this array indicates the
+   *                offset of each buf segment.
+   * @param lengths the length of each buf segment
+   * @param corruptedBlockMap map recording list of datanodes with corrupted
+   *                          block replica
+   */
+  void actualGetFromOneDataNode(final DNAddrPair datanode,
+      LocatedBlock block, final long startInBlk, final long endInBlk,
+      byte[] buf, int[] offsets, int[] lengths,
+      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      throws IOException {
     DFSClientFaultInjector.get().startFetchFromDatanode();
     int refetchToken = 1; // only need to get a new access token once
     int refetchEncryptionKey = 1; // only need to get a new encryption key once
+    final int len = (int) (endInBlk - startInBlk + 1);
+    checkReadPortions(offsets, lengths, len);
 
     while (true) {
       // cached block locations may have been updated by chooseDataNode()
       // or fetchBlockAt(). Always get the latest list of locations at the
       // start of the loop.
-      LocatedBlock block = getBlockAt(blockStartOffset);
+      block = refreshLocatedBlock(block);
       BlockReader reader = null;
       try {
         DFSClientFaultInjector.get().fetchFromDatanodeException();
-        int len = (int) (end - start + 1);
-        reader = getBlockReader(block, start, len, datanode.addr,
+        reader = getBlockReader(block, startInBlk, len, datanode.addr,
             datanode.storageType, datanode.info);
-        int nread = reader.readAll(buf, offset, len);
-        updateReadStatistics(readStatistics, nread, reader);
-
-        if (nread != len) {
-          throw new IOException("truncated return from reader.read(): " +
-                                "excpected " + len + ", got " + nread);
+        for (int i = 0; i < offsets.length; i++) {
+          int nread = reader.readAll(buf, offsets[i], lengths[i]);
+          updateReadStatistics(readStatistics, nread, reader);
+          if (nread != lengths[i]) {
+            throw new IOException("truncated return from reader.read(): " +
+                "excpected " + lengths[i] + ", got " + nread);
+          }
         }
         DFSClientFaultInjector.get().readFromDatanodeDelay();
         return;
@@ -1169,11 +1237,40 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   /**
-   * Like {@link #fetchBlockByteRange} except we start up a second, parallel,
+   * Refresh cached block locations.
+   * @param block The currently cached block locations
+   * @return Refreshed block locations
+   * @throws IOException
+   */
+  protected LocatedBlock refreshLocatedBlock(LocatedBlock block)
+      throws IOException {
+    return getBlockAt(block.getStartOffset());
+  }
+
+  /**
+   * This method verifies that the read portions are valid and do not overlap
+   * with each other.
+   */
+  private void checkReadPortions(int[] offsets, int[] lengths, int totalLen) {
+    Preconditions.checkArgument(offsets.length == lengths.length && offsets.length > 0);
+    int sum = 0;
+    for (int i = 0; i < lengths.length; i++) {
+      if (i > 0) {
+        int gap = offsets[i] - offsets[i - 1];
+        // make sure read portions do not overlap with each other
+        Preconditions.checkArgument(gap >= lengths[i - 1]);
+      }
+      sum += lengths[i];
+    }
+    Preconditions.checkArgument(sum == totalLen);
+  }
+
+  /**
+   * Like {@link #fetchBlockByteRange}except we start up a second, parallel,
    * 'hedged' read if the first read is taking longer than configured amount of
    * time. We then wait on which ever read returns first.
    */
-  private void hedgedFetchBlockByteRange(long blockStartOffset, long start,
+  private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
       long end, byte[] buf, int offset,
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
       throws IOException {
@@ -1186,7 +1283,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     ByteBuffer bb = null;
     int len = (int) (end - start + 1);
     int hedgedReadId = 0;
-    LocatedBlock block = getBlockAt(blockStartOffset);
+    block = refreshLocatedBlock(block);
     while (true) {
       // see HDFS-6591, this metric is used to verify/catch unnecessary loops
       hedgedReadOpsLoopNumForTesting++;
@@ -1198,7 +1295,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
         chosenNode = chooseDataNode(block, ignored);
         bb = ByteBuffer.wrap(buf, offset, len);
         Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
-            chosenNode, block.getStartOffset(), start, end, bb,
+            chosenNode, block, start, end, bb,
             corruptedBlockMap, hedgedReadId++);
         Future<ByteBuffer> firstRequest = hedgedService
             .submit(getFromDataNodeCallable);
@@ -1235,7 +1332,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
           }
           bb = ByteBuffer.allocate(len);
           Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
-              chosenNode, block.getStartOffset(), start, end, bb,
+              chosenNode, block, start, end, bb,
               corruptedBlockMap, hedgedReadId++);
           Future<ByteBuffer> oneMoreRequest = hedgedService
               .submit(getFromDataNodeCallable);
@@ -1319,7 +1416,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @return true if block access token has expired or invalid and it should be
    *         refetched
    */
-  private static boolean tokenRefetchNeeded(IOException ex,
+  protected static boolean tokenRefetchNeeded(IOException ex,
       InetSocketAddress targetAddr) {
     /*
      * Get a new access token and retry. Retry is needed in 2 cases. 1)
@@ -1389,13 +1486,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
       try {
         if (dfsClient.isHedgedReadsEnabled()) {
-          hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart,
-              targetStart + bytesToRead - 1, buffer, offset,
-              corruptedBlockMap);
+          hedgedFetchBlockByteRange(blk, targetStart,
+              targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
         } else {
-          fetchBlockByteRange(blk.getStartOffset(), targetStart,
-              targetStart + bytesToRead - 1, buffer, offset,
-              corruptedBlockMap);
+          fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
+              buffer, offset, corruptedBlockMap);
         }
       } finally {
         // Check and report if any block replicas are corrupted.
@@ -1427,7 +1522,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @param corruptedBlockMap map of corrupted blocks
    * @param dataNodeCount number of data nodes who contains the block replicas
    */
-  private void reportCheckSumFailure(
+  protected void reportCheckSumFailure(
       Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, 
       int dataNodeCount) {
     if (corruptedBlockMap.isEmpty()) {
@@ -1556,7 +1651,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   /**
    */
   @Override
-  public synchronized long getPos() throws IOException {
+  public synchronized long getPos() {
     return pos;
   }
 
@@ -1590,7 +1685,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   }
 
   /** Utility class to encapsulate data node info and its address. */
-  private static final class DNAddrPair {
+  static final class DNAddrPair {
     final DatanodeInfo info;
     final InetSocketAddress addr;
     final StorageType storageType;
@@ -1627,7 +1722,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
   }
 
-  private void closeCurrentBlockReader() {
+  protected void closeCurrentBlockReaders() {
     if (blockReader == null) return;
     // Close the current block reader so that the new caching settings can 
     // take effect immediately.
@@ -1647,7 +1742,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       this.cachingStrategy =
           new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
     }
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
   }
 
   @Override
@@ -1657,7 +1752,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       this.cachingStrategy =
           new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
     }
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
   }
 
   /**
@@ -1815,6 +1910,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
 
   @Override
   public synchronized void unbuffer() {
-    closeCurrentBlockReader();
+    closeCurrentBlockReaders();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/55bf014a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
index e62e6c1..76b191a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
@@ -343,7 +343,7 @@ public class TestDFSClientRetries {
       // we're starting a new operation on the user level.
       doAnswer(new FailNTimesAnswer(preSpyNN, maxBlockAcquires))
         .when(spyNN).getBlockLocations(anyString(), anyLong(), anyLong());
-      is.openInfo();
+      is.openInfo(true);
       // Seek to beginning forces a reopen of the BlockReader - otherwise it'll
       // just keep reading on the existing stream and the fact that we've poisoned
       // the block info won't do anything.