You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2016/02/25 18:56:05 UTC

hadoop git commit: HDFS-9734. Refactoring of checksum failure report related codes. Contributed by Kai Zheng.

Repository: hadoop
Updated Branches:
  refs/heads/trunk ccff6035f -> 8808779db


HDFS-9734. Refactoring of checksum failure report related codes. Contributed by Kai Zheng.

Change-Id: Ie69a77e3498a360959f8e213c51fb2b17c28b64a


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8808779d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8808779d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8808779d

Branch: refs/heads/trunk
Commit: 8808779db351fe444388d4acb3094766b5980718
Parents: ccff603
Author: Zhe Zhang <zh...@apache.org>
Authored: Thu Feb 25 09:55:50 2016 -0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Thu Feb 25 09:55:50 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 72 +++++++-------------
 .../hadoop/hdfs/DFSStripedInputStream.java      | 46 ++++++-------
 .../org/apache/hadoop/hdfs/DFSUtilClient.java   | 36 ++++++++++
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../hadoop/hdfs/server/datanode/DataNode.java   | 16 ++++-
 .../erasurecode/ErasureCodingWorker.java        | 50 ++++----------
 .../hadoop/hdfs/TestReconstructStripedFile.java |  4 +-
 7 files changed, 115 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8808779d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 3c91ca1..d713e8f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -26,8 +26,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -57,6 +55,7 @@ import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -857,7 +856,7 @@ public class DFSInputStream extends FSInputStream
    * ChecksumFileSystem
    */
   private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
-      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+                                      CorruptedBlocks corruptedBlocks)
       throws IOException {
     IOException ioe;
 
@@ -880,8 +879,7 @@ public class DFSInputStream extends FSInputStream
         ioe = ce;
         retryCurrentNode = false;
         // we want to remember which block replicas we have tried
-        addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
-            corruptedBlockMap);
+        corruptedBlocks.addCorruptedBlock(getCurrentBlock(), currentNode);
       } catch ( IOException e ) {
         if (!retryCurrentNode) {
           DFSClient.LOG.warn("Exception while reading from "
@@ -914,7 +912,8 @@ public class DFSInputStream extends FSInputStream
     if (closed.get()) {
       throw new IOException("Stream closed");
     }
-    Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<>();
+
+    CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
     failures = 0;
     if (pos < getFileLength()) {
       int retries = 2;
@@ -932,7 +931,7 @@ public class DFSInputStream extends FSInputStream
                   locatedBlocks.getFileLength() - pos);
             }
           }
-          int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
+          int result = readBuffer(strategy, off, realLen, corruptedBlocks);
 
           if (result >= 0) {
             pos += result;
@@ -958,7 +957,7 @@ public class DFSInputStream extends FSInputStream
         } finally {
           // Check if need to report block replicas corruption either read
           // was successful or ChecksumException occured.
-          reportCheckSumFailure(corruptedBlockMap,
+          reportCheckSumFailure(corruptedBlocks,
               currentLocatedBlock.getLocations().length, false);
         }
       }
@@ -999,24 +998,6 @@ public class DFSInputStream extends FSInputStream
     }
   }
 
-
-  /**
-   * Add corrupted block replica into map.
-   */
-  protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
-      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
-    Set<DatanodeInfo> dnSet;
-    if((corruptedBlockMap.containsKey(blk))) {
-      dnSet = corruptedBlockMap.get(blk);
-    }else {
-      dnSet = new HashSet<>();
-    }
-    if (!dnSet.contains(node)) {
-      dnSet.add(node);
-      corruptedBlockMap.put(blk, dnSet);
-    }
-  }
-
   private DNAddrPair chooseDataNode(LocatedBlock block,
       Collection<DatanodeInfo> ignoredNodes) throws IOException {
     while (true) {
@@ -1143,15 +1124,14 @@ public class DFSInputStream extends FSInputStream
   }
 
   protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
-      byte[] buf, int offset,
-      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      byte[] buf, int offset, CorruptedBlocks corruptedBlocks)
       throws IOException {
     block = refreshLocatedBlock(block);
     while (true) {
       DNAddrPair addressPair = chooseDataNode(block, null);
       try {
         actualGetFromOneDataNode(addressPair, block, start, end,
-            buf, offset, corruptedBlockMap);
+            buf, offset, corruptedBlocks);
         return;
       } catch (IOException e) {
         // Ignore. Already processed inside the function.
@@ -1163,7 +1143,7 @@ public class DFSInputStream extends FSInputStream
   private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
       final LocatedBlock block, final long start, final long end,
       final ByteBuffer bb,
-      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
+      final CorruptedBlocks corruptedBlocks,
       final int hedgedReadId) {
     final SpanId parentSpanId = Tracer.getCurrentSpanId();
     return new Callable<ByteBuffer>() {
@@ -1174,7 +1154,7 @@ public class DFSInputStream extends FSInputStream
         try (TraceScope ignored = dfsClient.getTracer().
             newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
           actualGetFromOneDataNode(datanode, block, start, end, buf,
-              offset, corruptedBlockMap);
+              offset, corruptedBlocks);
           return bb;
         }
       }
@@ -1190,12 +1170,12 @@ public class DFSInputStream extends FSInputStream
    * @param endInBlk          the endInBlk offset of the block
    * @param buf               the given byte array into which the data is read
    * @param offset            the offset in buf
-   * @param corruptedBlockMap map recording list of datanodes with corrupted
+   * @param corruptedBlocks   map recording list of datanodes with corrupted
    *                          block replica
    */
   void actualGetFromOneDataNode(final DNAddrPair datanode, LocatedBlock block,
       final long startInBlk, final long endInBlk, byte[] buf, int offset,
-      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+                                CorruptedBlocks corruptedBlocks)
       throws IOException {
     DFSClientFaultInjector.get().startFetchFromDatanode();
     int refetchToken = 1; // only need to get a new access token once
@@ -1226,8 +1206,7 @@ public class DFSInputStream extends FSInputStream
             + datanode.info;
         DFSClient.LOG.warn(msg);
         // we want to remember what we have tried
-        addIntoCorruptedBlockMap(block.getBlock(), datanode.info,
-            corruptedBlockMap);
+        corruptedBlocks.addCorruptedBlock(block.getBlock(), datanode.info);
         addToDeadNodes(datanode.info);
         throw new IOException(msg);
       } catch (IOException e) {
@@ -1277,8 +1256,7 @@ public class DFSInputStream extends FSInputStream
    * time. We then wait on which ever read returns first.
    */
   private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
-      long end, byte[] buf, int offset,
-      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      long end, byte[] buf, int offset, CorruptedBlocks corruptedBlocks)
       throws IOException {
     final DfsClientConf conf = dfsClient.getConf();
     ArrayList<Future<ByteBuffer>> futures = new ArrayList<>();
@@ -1301,7 +1279,7 @@ public class DFSInputStream extends FSInputStream
         bb = ByteBuffer.wrap(buf, offset, len);
         Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
             chosenNode, block, start, end, bb,
-            corruptedBlockMap, hedgedReadId++);
+            corruptedBlocks, hedgedReadId++);
         Future<ByteBuffer> firstRequest = hedgedService
             .submit(getFromDataNodeCallable);
         futures.add(firstRequest);
@@ -1333,7 +1311,7 @@ public class DFSInputStream extends FSInputStream
           bb = ByteBuffer.allocate(len);
           Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
               chosenNode, block, start, end, bb,
-              corruptedBlockMap, hedgedReadId++);
+              corruptedBlocks, hedgedReadId++);
           Future<ByteBuffer> oneMoreRequest = hedgedService
               .submit(getFromDataNodeCallable);
           futures.add(oneMoreRequest);
@@ -1476,23 +1454,23 @@ public class DFSInputStream extends FSInputStream
     // corresponding to position and realLen
     List<LocatedBlock> blockRange = getBlockRange(position, realLen);
     int remaining = realLen;
-    Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap = new HashMap<>();
+    CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
     for (LocatedBlock blk : blockRange) {
       long targetStart = position - blk.getStartOffset();
       long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
       try {
         if (dfsClient.isHedgedReadsEnabled() && !blk.isStriped()) {
           hedgedFetchBlockByteRange(blk, targetStart,
-              targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap);
+              targetStart + bytesToRead - 1, buffer, offset, corruptedBlocks);
         } else {
           fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
-              buffer, offset, corruptedBlockMap);
+              buffer, offset, corruptedBlocks);
         }
       } finally {
         // Check and report if any block replicas are corrupted.
         // BlockMissingException may be caught if all block replicas are
         // corrupted.
-        reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length,
+        reportCheckSumFailure(corruptedBlocks, blk.getLocations().length,
             false);
       }
 
@@ -1523,12 +1501,14 @@ public class DFSInputStream extends FSInputStream
    * corresponding to each internal block. For this case we simply report the
    * corrupted blocks to NameNode and ignore the above logic.
    *
-   * @param corruptedBlockMap map of corrupted blocks
+   * @param corruptedBlocks map of corrupted blocks
    * @param dataNodeCount number of data nodes who contains the block replicas
    */
-  protected void reportCheckSumFailure(
-      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
+  protected void reportCheckSumFailure(CorruptedBlocks corruptedBlocks,
       int dataNodeCount, boolean isStriped) {
+
+    Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap =
+        corruptedBlocks.getCorruptionMap();
     if (corruptedBlockMap.isEmpty()) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8808779d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 3483255..d4174d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.io.ByteBufferPool;
 
@@ -282,8 +283,7 @@ public class DFSStripedInputStream extends DFSInputStream {
    * Read a new stripe covering the current position, and store the data in the
    * {@link #curStripeBuf}.
    */
-  private void readOneStripe(
-      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+  private void readOneStripe(CorruptedBlocks corruptedBlocks)
       throws IOException {
     resetCurStripeBuffer();
 
@@ -307,7 +307,7 @@ public class DFSStripedInputStream extends DFSInputStream {
     for (AlignedStripe stripe : stripes) {
       // Parse group to get chosen DN location
       StripeReader sreader = new StatefulStripeReader(readingService, stripe,
-          blks, blockReaders, corruptedBlockMap);
+          blks, blockReaders, corruptedBlocks);
       sreader.readStripe();
     }
     curStripeBuf.position(stripeBufOffset);
@@ -319,7 +319,7 @@ public class DFSStripedInputStream extends DFSInputStream {
       final DatanodeInfo datanode, final long currentReaderOffset,
       final long targetReaderOffset, final ByteBufferStrategy[] strategies,
       final ExtendedBlock currentBlock,
-      final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+      final CorruptedBlocks corruptedBlocks) {
     return new Callable<Void>() {
       @Override
       public Void call() throws Exception {
@@ -338,7 +338,7 @@ public class DFSStripedInputStream extends DFSInputStream {
         int result = 0;
         for (ByteBufferStrategy strategy : strategies) {
           result += readToBuffer(reader, datanode, strategy, currentBlock,
-              corruptedBlockMap);
+              corruptedBlocks);
         }
         return null;
       }
@@ -348,7 +348,7 @@ public class DFSStripedInputStream extends DFSInputStream {
   private int readToBuffer(BlockReader blockReader,
       DatanodeInfo currentNode, ByteBufferStrategy strategy,
       ExtendedBlock currentBlock,
-      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      CorruptedBlocks corruptedBlocks)
       throws IOException {
     final int targetLength = strategy.buf.remaining();
     int length = 0;
@@ -366,8 +366,7 @@ public class DFSStripedInputStream extends DFSInputStream {
           + currentBlock + " from " + currentNode
           + " at " + ce.getPos());
       // we want to remember which block replicas we have tried
-      addIntoCorruptedBlockMap(currentBlock, currentNode,
-          corruptedBlockMap);
+      corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
       throw ce;
     } catch (IOException e) {
       DFSClient.LOG.warn("Exception while reading from "
@@ -423,8 +422,8 @@ public class DFSStripedInputStream extends DFSInputStream {
     if (closed.get()) {
       throw new IOException("Stream closed");
     }
-    Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap =
-        new ConcurrentHashMap<>();
+
+    CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
     if (pos < getFileLength()) {
       try {
         if (pos > blockEnd) {
@@ -442,7 +441,7 @@ public class DFSStripedInputStream extends DFSInputStream {
         int result = 0;
         while (result < realLen) {
           if (!curStripeRange.include(getOffsetInBlockGroup())) {
-            readOneStripe(corruptedBlockMap);
+            readOneStripe(corruptedBlocks);
           }
           int ret = copyToTargetBuf(strategy, off + result, realLen - result);
           result += ret;
@@ -455,7 +454,7 @@ public class DFSStripedInputStream extends DFSInputStream {
       } finally {
         // Check if need to report block replicas corruption either read
         // was successful or ChecksumException occured.
-        reportCheckSumFailure(corruptedBlockMap,
+        reportCheckSumFailure(corruptedBlocks,
             currentLocatedBlock.getLocations().length, true);
       }
     }
@@ -519,8 +518,7 @@ public class DFSStripedInputStream extends DFSInputStream {
    */
   @Override
   protected void fetchBlockByteRange(LocatedBlock block, long start,
-      long end, byte[] buf, int offset,
-      Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
+      long end, byte[] buf, int offset, CorruptedBlocks corruptedBlocks)
       throws IOException {
     // Refresh the striped block group
     LocatedStripedBlock blockGroup = getBlockGroupAt(block.getStartOffset());
@@ -536,7 +534,7 @@ public class DFSStripedInputStream extends DFSInputStream {
       for (AlignedStripe stripe : stripes) {
         // Parse group to get chosen DN location
         StripeReader preader = new PositionStripeReader(readService, stripe,
-            blks, preaderInfos, corruptedBlockMap);
+            blks, preaderInfos, corruptedBlocks);
         preader.readStripe();
       }
     } finally {
@@ -575,17 +573,17 @@ public class DFSStripedInputStream extends DFSInputStream {
     final AlignedStripe alignedStripe;
     final CompletionService<Void> service;
     final LocatedBlock[] targetBlocks;
-    final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap;
+    final CorruptedBlocks corruptedBlocks;
     final BlockReaderInfo[] readerInfos;
 
     StripeReader(CompletionService<Void> service, AlignedStripe alignedStripe,
         LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
-        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+                 CorruptedBlocks corruptedBlocks) {
       this.service = service;
       this.alignedStripe = alignedStripe;
       this.targetBlocks = targetBlocks;
       this.readerInfos = readerInfos;
-      this.corruptedBlockMap = corruptedBlockMap;
+      this.corruptedBlocks = corruptedBlocks;
     }
 
     /** prepare all the data chunks */
@@ -731,7 +729,7 @@ public class DFSStripedInputStream extends DFSInputStream {
           readerInfos[chunkIndex].datanode,
           readerInfos[chunkIndex].blockReaderOffset,
           alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
-          block.getBlock(), corruptedBlockMap);
+          block.getBlock(), corruptedBlocks);
 
       Future<Void> request = service.submit(readCallable);
       futures.put(request, chunkIndex);
@@ -812,10 +810,9 @@ public class DFSStripedInputStream extends DFSInputStream {
 
     PositionStripeReader(CompletionService<Void> service,
         AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
-        BlockReaderInfo[] readerInfos,
-        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+        BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks) {
       super(service, alignedStripe, targetBlocks, readerInfos,
-          corruptedBlockMap);
+          corruptedBlocks);
     }
 
     @Override
@@ -849,10 +846,9 @@ public class DFSStripedInputStream extends DFSInputStream {
 
     StatefulStripeReader(CompletionService<Void> service,
         AlignedStripe alignedStripe, LocatedBlock[] targetBlocks,
-        BlockReaderInfo[] readerInfos,
-        Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) {
+        BlockReaderInfo[] readerInfos, CorruptedBlocks corruptedBlocks) {
       super(service, alignedStripe, targetBlocks, readerInfos,
-          corruptedBlockMap);
+          corruptedBlocks);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8808779d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 8f6ed14..d646252 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@@ -69,9 +70,11 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
@@ -681,4 +684,37 @@ public class DFSUtilClient {
     iioe.initCause(e);
     return iioe;
   }
+
+  /**
+   * A utility class as a container to put corrupted blocks, shared by client
+   * and datanode.
+   */
+  public static class CorruptedBlocks {
+    private Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap;
+
+    public CorruptedBlocks() {
+      this.corruptionMap = new HashMap<>();
+    }
+
+    /**
+     * Indicate a block replica on the specified datanode is corrupted
+     */
+    public void addCorruptedBlock(ExtendedBlock blk, DatanodeInfo node) {
+      Set<DatanodeInfo> dnSet = corruptionMap.get(blk);
+      if (dnSet == null) {
+        dnSet = new HashSet<>();
+        corruptionMap.put(blk, dnSet);
+      }
+      if (!dnSet.contains(node)) {
+        dnSet.add(node);
+      }
+    }
+
+    /**
+     * @return the map that contains all the corruption entries.
+     */
+    public Map<ExtendedBlock, Set<DatanodeInfo>> getCorruptionMap() {
+      return corruptionMap;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8808779d/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 1169e66..e3990ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -443,6 +443,9 @@ Trunk (Unreleased)
     HDFS-9837. BlockManager#countNodes should be able to detect duplicated
     internal blocks. (jing9)
 
+    HDFS-9734. Refactoring of checksum failure report related codes.
+    (Kai Zheng via zhz)
+
     BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS
 
       HDFS-7347. Configurable erasure coding policy for individual files and

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8808779d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index aed3eaa..b347129 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1150,7 +1150,21 @@ public class DataNode extends ReconfigurableBase
     BPOfferService bpos = getBPOSForBlock(block);
     bpos.reportRemoteBadBlock(srcDataNode, block);
   }
-  
+
+  public void reportCorruptedBlocks(
+      DFSUtilClient.CorruptedBlocks corruptedBlocks) throws IOException {
+    Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap =
+        corruptedBlocks.getCorruptionMap();
+    if (!corruptionMap.isEmpty()) {
+      for (Map.Entry<ExtendedBlock, Set<DatanodeInfo>> entry :
+          corruptionMap.entrySet()) {
+        for (DatanodeInfo dnInfo : entry.getValue()) {
+          reportRemoteBadBlock(dnInfo, entry.getKey());
+        }
+      }
+    }
+  }
+
   /**
    * Try to send an error report to the NNs associated with the given
    * block pool.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8808779d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
index 1017e1e..bde8d80 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java
@@ -32,10 +32,8 @@ import java.util.BitSet;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
@@ -54,6 +52,7 @@ import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSPacket;
 import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
 import org.apache.hadoop.hdfs.RemoteBlockReader2;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -460,13 +459,13 @@ public final class ErasureCodingWorker {
               bufferSize, maxTargetLength - positionInBlock);
           // step1: read from minimum source DNs required for reconstruction.
           // The returned success list is the source DNs we do real read from
-          Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap = new HashMap<>();
+          CorruptedBlocks corruptedBlocks = new CorruptedBlocks();
           try {
             success = readMinimumStripedData4Reconstruction(success,
-                toReconstruct, corruptionMap);
+                toReconstruct, corruptedBlocks);
           } finally {
             // report corrupted blocks to NN
-            reportCorruptedBlocks(corruptionMap);
+            datanode.reportCorruptedBlocks(corruptedBlocks);
           }
 
           // step2: decode to reconstruct targets
@@ -564,8 +563,7 @@ public final class ErasureCodingWorker {
      * @throws IOException
      */
     private int[] readMinimumStripedData4Reconstruction(final int[] success,
-        int reconstructLength,
-        Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap)
+        int reconstructLength, CorruptedBlocks corruptedBlocks)
             throws IOException {
       Preconditions.checkArgument(reconstructLength >= 0 &&
           reconstructLength <= bufferSize);
@@ -582,7 +580,7 @@ public final class ErasureCodingWorker {
             reconstructLength);
         if (toRead > 0) {
           Callable<Void> readCallable = readFromBlock(reader, reader.buffer,
-              toRead, corruptionMap);
+              toRead, corruptedBlocks);
           Future<Void> f = readService.submit(readCallable);
           futures.put(f, success[i]);
         } else {
@@ -608,11 +606,11 @@ public final class ErasureCodingWorker {
             IOUtils.closeStream(failedReader.blockReader);
             failedReader.blockReader = null;
             resultIndex = scheduleNewRead(used, reconstructLength,
-                corruptionMap);
+                corruptedBlocks);
           } else if (result.state == StripingChunkReadResult.TIMEOUT) {
             // If timeout, we also schedule a new read.
             resultIndex = scheduleNewRead(used, reconstructLength,
-                corruptionMap);
+                corruptedBlocks);
           }
           if (resultIndex >= 0) {
             newSuccess[nsuccess++] = resultIndex;
@@ -723,7 +721,7 @@ public final class ErasureCodingWorker {
      * @return the array index of source DN if don't need to do real read.
      */
     private int scheduleNewRead(BitSet used, int reconstructLen,
-        Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) {
+                                CorruptedBlocks corruptedBlocks) {
       StripedReader reader = null;
       // step1: initially we may only have <code>minRequiredSources</code>
       // number of StripedReader, and there may be some source DNs we never 
@@ -775,7 +773,7 @@ public final class ErasureCodingWorker {
       // step3: schedule if find a correct source DN and need to do real read.
       if (reader != null) {
         Callable<Void> readCallable = readFromBlock(reader, reader.buffer,
-            toRead, corruptionMap);
+            toRead, corruptedBlocks);
         Future<Void> f = readService.submit(readCallable);
         futures.put(f, m);
         used.set(m);
@@ -793,7 +791,7 @@ public final class ErasureCodingWorker {
 
     private Callable<Void> readFromBlock(final StripedReader reader,
         final ByteBuffer buf, final int length,
-        final Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) {
+        final CorruptedBlocks corruptedBlocks) {
       return new Callable<Void>() {
 
         @Override
@@ -805,7 +803,7 @@ public final class ErasureCodingWorker {
           } catch (ChecksumException e) {
             LOG.warn("Found Checksum error for {} from {} at {}", reader.block,
                 reader.source, e.getPos());
-            addCorruptedBlock(reader.block, reader.source, corruptionMap);
+            corruptedBlocks.addCorruptedBlock(reader.block, reader.source);
             throw e;
           } catch (IOException e) {
             LOG.info(e.getMessage());
@@ -816,30 +814,6 @@ public final class ErasureCodingWorker {
       };
     }
 
-    private void reportCorruptedBlocks(
-        Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) throws IOException {
-      if (!corruptionMap.isEmpty()) {
-        for (Map.Entry<ExtendedBlock, Set<DatanodeInfo>> entry :
-            corruptionMap.entrySet()) {
-          for (DatanodeInfo dnInfo : entry.getValue()) {
-            datanode.reportRemoteBadBlock(dnInfo, entry.getKey());
-          }
-        }
-      }
-    }
-
-    private void addCorruptedBlock(ExtendedBlock blk, DatanodeInfo node,
-        Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) {
-      Set<DatanodeInfo> dnSet = corruptionMap.get(blk);
-      if (dnSet == null) {
-        dnSet = new HashSet<>();
-        corruptionMap.put(blk, dnSet);
-      }
-      if (!dnSet.contains(node)) {
-        dnSet.add(node);
-      }
-    }
-
     /**
      * Read bytes from block
      */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8808779d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
index 8241882..38ca8ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java
@@ -217,7 +217,7 @@ public class TestReconstructStripedFile {
     return d;
   }
 
-  private void shutdownDataNodes(DataNode dn) throws IOException {
+  private void shutdownDataNode(DataNode dn) throws IOException {
     /*
      * Kill the datanode which contains one replica
      * We need to make sure it dead in namenode: clear its update time and
@@ -237,7 +237,7 @@ public class TestReconstructStripedFile {
         // stop at least one DN to trigger reconstruction
         LOG.info("Note: stop DataNode " + target.getValue().getDisplayName()
             + " with internal block " + target.getKey());
-        shutdownDataNodes(target.getValue());
+        shutdownDataNode(target.getValue());
         stoppedDN++;
       } else { // corrupt the data on the DN
         LOG.info("Note: corrupt data on " + target.getValue().getDisplayName()