You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2014/10/28 20:27:21 UTC

git commit: HDFS-6663. Admin command to track file and locations from block id. Contributed by Chen He.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 0d3e7e2bd -> 371a3b87e


HDFS-6663. Admin command to track file and locations from block id.
Contributed by Chen He.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/371a3b87
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/371a3b87
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/371a3b87

Branch: refs/heads/trunk
Commit: 371a3b87ed346732ed58a4faab0c6c1db57c86ed
Parents: 0d3e7e2
Author: Kihwal Lee <ki...@apache.org>
Authored: Tue Oct 28 14:26:04 2014 -0500
Committer: Kihwal Lee <ki...@apache.org>
Committed: Tue Oct 28 14:26:04 2014 -0500

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../server/blockmanagement/BlockManager.java    |   7 +
 .../blockmanagement/CorruptReplicasMap.java     |  23 +-
 .../hdfs/server/namenode/NamenodeFsck.java      | 106 +++++++-
 .../org/apache/hadoop/hdfs/tools/DFSck.java     |  21 +-
 .../hadoop/hdfs/server/namenode/TestFsck.java   | 249 ++++++++++++++++++-
 6 files changed, 394 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/371a3b87/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 13b2e9f..f704efc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -262,6 +262,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7278. Add a command that allows sysadmins to manually trigger full
     block reports from a DN (cmccabe)
 
+    HDFS-6663. Admin command to track file and locations from block id.
+    (Chen He via kihwal)
+
   IMPROVEMENTS
 
     HDFS-7055. Add tracing to DFSInputStream (cmccabe)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/371a3b87/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 98388d4..9b2ce90 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -3505,6 +3505,13 @@ public class BlockManager {
     return corruptReplicas.getNodes(block);
   }
 
+ /**
+  * Get reason for certain corrupted replicas for a given block and a given dn.
+  */
+ public String getCorruptReason(Block block, DatanodeDescriptor node) {
+   return corruptReplicas.getCorruptReason(block, node);
+ }
+
   /** @return the size of UnderReplicatedBlocks */
   public int numOfUnderReplicatedBlocks() {
     return neededReplications.size();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/371a3b87/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
index 9b7515d..764f25d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
@@ -223,5 +223,26 @@ public class CorruptReplicasMap{
     }
     
     return ret;
-  }  
+  }
+
+  /**
+   * return the reason about corrupted replica for a given block
+   * on a given dn
+   * @param block block that has corrupted replica
+   * @param node datanode that contains this corrupted replica
+   * @return reason
+   */
+  String getCorruptReason(Block block, DatanodeDescriptor node) {
+    Reason reason = null;
+    if(corruptReplicasMap.containsKey(block)) {
+      if (corruptReplicasMap.get(block).containsKey(node)) {
+        reason = corruptReplicasMap.get(block).get(node);
+      }
+    }
+    if (reason != null) {
+      return reason.toString();
+    } else {
+      return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/371a3b87/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index a187123..f82f0ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.RemotePeerFactory;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@@ -59,8 +60,12 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -103,6 +108,8 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
   // return string marking fsck status
   public static final String CORRUPT_STATUS = "is CORRUPT";
   public static final String HEALTHY_STATUS = "is HEALTHY";
+  public static final String DECOMMISSIONING_STATUS = "is DECOMMISSIONING";
+  public static final String DECOMMISSIONED_STATUS = "is DECOMMISSIONED";
   public static final String NONEXISTENT_STATUS = "does not exist";
   public static final String FAILURE_STATUS = "FAILED";
   
@@ -143,7 +150,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
    */
   private boolean doDelete = false;
 
-  private String path = "/";
+  String path = "/";
+
+  private String blockIds = null;
 
   // We return back N files that are corrupt; the list of files returned is
   // ordered by block id; to allow continuation support, pass in the last block
@@ -195,21 +204,112 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
       else if (key.equals("openforwrite")) {this.showOpenFiles = true; }
       else if (key.equals("listcorruptfileblocks")) {
         this.showCorruptFileBlocks = true;
-      }
-      else if (key.equals("startblockafter")) {
+      } else if (key.equals("startblockafter")) {
         this.currentCookie[0] = pmap.get("startblockafter")[0];
       } else if (key.equals("includeSnapshots")) {
         this.snapshottableDirs = new ArrayList<String>();
+      } else if (key.equals("blockId")) {
+        this.blockIds = pmap.get("blockId")[0];
       }
     }
   }
 
   /**
+   * Check block information given a blockId number
+   *
+  */
+  public void blockIdCK(String blockId) {
+
+    if(blockId == null) {
+      out.println("Please provide valid blockId!");
+      return;
+    }
+
+    BlockManager bm = namenode.getNamesystem().getBlockManager();
+    try {
+      //get blockInfo
+      Block block = new Block(Block.getBlockId(blockId));
+      //find which file this block belongs to
+      BlockInfo blockInfo = bm.getStoredBlock(block);
+      if(blockInfo == null) {
+        out.println("Block "+ blockId +" " + NONEXISTENT_STATUS);
+        LOG.warn("Block "+ blockId + " " + NONEXISTENT_STATUS);
+        return;
+      }
+      BlockCollection bc = bm.getBlockCollection(blockInfo);
+      INode iNode = (INode) bc;
+      NumberReplicas numberReplicas= bm.countNodes(block);
+      out.println("Block Id: " + blockId);
+      out.println("Block belongs to: "+iNode.getFullPathName());
+      out.println("No. of Expected Replica: " + bc.getBlockReplication());
+      out.println("No. of live Replica: " + numberReplicas.liveReplicas());
+      out.println("No. of excess Replica: " + numberReplicas.excessReplicas());
+      out.println("No. of stale Replica: " + numberReplicas.replicasOnStaleNodes());
+      out.println("No. of decommission Replica: "
+          + numberReplicas.decommissionedReplicas());
+      out.println("No. of corrupted Replica: " + numberReplicas.corruptReplicas());
+      //record datanodes that have corrupted block replica
+      Collection<DatanodeDescriptor> corruptionRecord = null;
+      if (bm.getCorruptReplicas(block) != null) {
+        corruptionRecord = bm.getCorruptReplicas(block);
+      }
+
+      //report block replicas status on datanodes
+      for(int idx = (blockInfo.numNodes()-1); idx >= 0; idx--) {
+        DatanodeDescriptor dn = blockInfo.getDatanode(idx);
+        out.print("Block replica on datanode/rack: " + dn.getHostName() +
+            dn.getNetworkLocation() + " ");
+        if (corruptionRecord != null && corruptionRecord.contains(dn)) {
+          out.print(CORRUPT_STATUS+"\t ReasonCode: "+
+            bm.getCorruptReason(block,dn));
+        } else if (dn.isDecommissioned() ){
+          out.print(DECOMMISSIONED_STATUS);
+        } else if (dn.isDecommissionInProgress()) {
+          out.print(DECOMMISSIONING_STATUS);
+        } else {
+          out.print(HEALTHY_STATUS);
+        }
+        out.print("\n");
+      }
+    } catch (Exception e){
+      String errMsg = "Fsck on blockId '" + blockId;
+      LOG.warn(errMsg, e);
+      out.println(e.getMessage());
+      out.print("\n\n" + errMsg);
+      LOG.warn("Error in looking up block", e);
+    }
+  }
+
+  /**
    * Check files on DFS, starting from the indicated path.
    */
   public void fsck() {
     final long startTime = Time.now();
     try {
+      if(blockIds != null) {
+
+        String[] blocks = blockIds.split(" ");
+        StringBuilder sb = new StringBuilder();
+        sb.append("FSCK started by " +
+            UserGroupInformation.getCurrentUser() + " from " +
+            remoteAddress + " at " + new Date());
+        out.println(sb.toString());
+        sb.append(" for blockIds: \n");
+        for (String blk: blocks) {
+          if(blk == null || !blk.contains("blk_")) {
+            out.println("Incorrect blockId format: " + blk);
+            continue;
+          }
+          out.print("\n");
+          blockIdCK(blk);
+          sb.append(blk + "\n");
+        }
+        LOG.info(sb.toString());
+        namenode.getNamesystem().logFsckEvent("/", remoteAddress);
+        out.flush();
+        return;
+      }
+
       String msg = "FSCK started by " + UserGroupInformation.getCurrentUser()
           + " from " + remoteAddress + " for path " + path + " at " + new Date();
       LOG.info(msg);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/371a3b87/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java
index 31ab596..0d73b43 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java
@@ -92,7 +92,10 @@ public class DFSck extends Configured implements Tool {
       + "\t-blocks\tprint out block report\n"
       + "\t-locations\tprint out locations for every block\n"
       + "\t-racks\tprint out network topology for data-node locations\n"
-      + "\t-showprogress\tshow progress in output. Default is OFF (no progress)\n\n"
+      + "\t-showprogress\tshow progress in output. Default is OFF (no progress)\n"
+      + "\t-blockId\tprint out which file this blockId belongs to, locations"
+      + " (nodes, racks) of this block, and other diagnostics info"
+      + " (under replicated, corrupted or not, etc)\n\n"
       + "Please Note:\n"
       + "\t1. By default fsck ignores files opened for write, "
       + "use -openforwrite to report such files. They are usually "
@@ -278,6 +281,15 @@ public class DFSck extends Configured implements Tool {
         doListCorruptFileBlocks = true;
       } else if (args[idx].equals("-includeSnapshots")) {
         url.append("&includeSnapshots=1");
+      } else if (args[idx].equals("-blockId")) {
+        StringBuilder sb = new StringBuilder();
+        idx++;
+        while(idx < args.length && !args[idx].startsWith("-")){
+          sb.append(args[idx]);
+          sb.append(" ");
+          idx++;
+        }
+        url.append("&blockId=").append(URLEncoder.encode(sb.toString(), "UTF-8"));
       } else if (!args[idx].startsWith("-")) {
         if (null == dir) {
           dir = args[idx];
@@ -287,6 +299,7 @@ public class DFSck extends Configured implements Tool {
           printUsage(System.err);
           return -1;
         }
+
       } else {
         System.err.println("fsck: Illegal option '" + args[idx] + "'");
         printUsage(System.err);
@@ -327,6 +340,12 @@ public class DFSck extends Configured implements Tool {
       errCode = 1;
     } else if (lastLine.endsWith(NamenodeFsck.NONEXISTENT_STATUS)) {
       errCode = 0;
+    } else if (lastLine.contains("Incorrect blockId format:")) {
+      errCode = 0;
+    } else if (lastLine.endsWith(NamenodeFsck.DECOMMISSIONED_STATUS)) {
+      errCode = 2;
+    } else if (lastLine.endsWith(NamenodeFsck.DECOMMISSIONING_STATUS)) {
+      errCode = 3;
     }
     return errCode;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/371a3b87/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
index e16df16..ef7de0d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
@@ -18,13 +18,6 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -48,6 +41,7 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -66,11 +60,14 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.Result;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -86,8 +83,17 @@ import org.apache.log4j.PatternLayout;
 import org.apache.log4j.RollingFileAppender;
 import org.junit.Test;
 
-import com.google.common.collect.Sets;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * A JUnit test for doing fsck
@@ -118,7 +124,7 @@ public class TestFsck {
     System.getProperty("line.separator");
 
   static String runFsck(Configuration conf, int expectedErrCode, 
-                        boolean checkErrorCode,String... path) 
+                        boolean checkErrorCode,String... path)
                         throws Exception {
     ByteArrayOutputStream bStream = new ByteArrayOutputStream();
     PrintStream out = new PrintStream(bStream, true);
@@ -1096,4 +1102,227 @@ public class TestFsck {
       cluster.shutdown();
     }
   }
+
+  /**
+   * Test for blockIdCK
+   */
+
+  @Test
+  public void testBlockIdCK() throws Exception {
+
+    final short REPL_FACTOR = 2;
+    short NUM_DN = 2;
+    final long blockSize = 512;
+
+    String [] racks = {"/rack1", "/rack2"};
+    String [] hosts = {"host1", "host2"};
+
+    Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
+
+    MiniDFSCluster cluster = null;
+    DistributedFileSystem dfs = null;
+    cluster =
+      new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts)
+        .racks(racks).build();
+
+    assertNotNull("Failed Cluster Creation", cluster);
+    cluster.waitClusterUp();
+    dfs = cluster.getFileSystem();
+    assertNotNull("Failed to get FileSystem", dfs);
+
+    DFSTestUtil util = new DFSTestUtil.Builder().
+      setName(getClass().getSimpleName()).setNumFiles(1).build();
+    //create files
+    final String pathString = new String("/testfile");
+    final Path path = new Path(pathString);
+    util.createFile(dfs, path, 1024, REPL_FACTOR , 1000L);
+    util.waitReplication(dfs, path, REPL_FACTOR);
+    StringBuilder sb = new StringBuilder();
+    for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
+      sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
+    }
+    String[] bIds = sb.toString().split(" ");
+
+    //run fsck
+    try {
+      //illegal input test
+      String runFsckResult = runFsck(conf, 0, true, "/", "-blockId",
+          "not_a_block_id");
+      assertTrue(runFsckResult.contains("Incorrect blockId format:"));
+
+      //general test
+      runFsckResult = runFsck(conf, 0, true, "/", "-blockId", sb.toString());
+      assertTrue(runFsckResult.contains(bIds[0]));
+      assertTrue(runFsckResult.contains(bIds[1]));
+      assertTrue(runFsckResult.contains(
+          "Block replica on datanode/rack: host1/rack1 is HEALTHY"));
+      assertTrue(runFsckResult.contains(
+          "Block replica on datanode/rack: host2/rack2 is HEALTHY"));
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test for blockIdCK with datanode decommission
+   */
+  @Test
+  public void testBlockIdCKDecommission() throws Exception {
+
+    final short REPL_FACTOR = 1;
+    short NUM_DN = 2;
+    final long blockSize = 512;
+    boolean checkDecommissionInProgress = false;
+    String [] racks = {"/rack1", "/rack2"};
+    String [] hosts = {"host1", "host2"};
+
+    Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
+
+    MiniDFSCluster cluster;
+    DistributedFileSystem dfs ;
+    cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts)
+            .racks(racks).build();
+
+    assertNotNull("Failed Cluster Creation", cluster);
+    cluster.waitClusterUp();
+    dfs = cluster.getFileSystem();
+    assertNotNull("Failed to get FileSystem", dfs);
+
+    DFSTestUtil util = new DFSTestUtil.Builder().
+        setName(getClass().getSimpleName()).setNumFiles(1).build();
+    //create files
+    final String pathString = new String("/testfile");
+    final Path path = new Path(pathString);
+    util.createFile(dfs, path, 1024, REPL_FACTOR, 1000L);
+    util.waitReplication(dfs, path, REPL_FACTOR);
+    StringBuilder sb = new StringBuilder();
+    for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
+      sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
+    }
+    String[] bIds = sb.toString().split(" ");
+    try {
+      //make sure datanode that has replica is fine before decommission
+      String outStr = runFsck(conf, 0, true, "/", "-blockId", bIds[0]);
+      System.out.println(outStr);
+      assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
+
+      //decommission datanode
+      ExtendedBlock eb = util.getFirstBlock(dfs, path);
+      DatanodeDescriptor dn = cluster.getNameNode().getNamesystem()
+          .getBlockManager().getBlockCollection(eb.getLocalBlock())
+          .getBlocks()[0].getDatanode(0);
+      cluster.getNameNode().getNamesystem().getBlockManager()
+          .getDatanodeManager().startDecommission(dn);
+      String dnName = dn.getXferAddr();
+
+      //wait for decommission start
+      DatanodeInfo datanodeInfo = null;
+      int count = 0;
+      do {
+        Thread.sleep(2000);
+        for (DatanodeInfo info : dfs.getDataNodeStats()) {
+          if (dnName.equals(info.getXferAddr())) {
+            datanodeInfo = info;
+          }
+        }
+         //check decommissioning only once
+        if(!checkDecommissionInProgress && datanodeInfo != null
+            && datanodeInfo.isDecommissionInProgress()) {
+          String fsckOut = runFsck(conf, 3, true, "/", "-blockId", bIds[0]);
+          assertTrue(fsckOut.contains(NamenodeFsck.DECOMMISSIONING_STATUS));
+          checkDecommissionInProgress =  true;
+        }
+      } while (datanodeInfo != null && !datanodeInfo.isDecommissioned());
+
+      //check decommissioned
+      String fsckOut = runFsck(conf, 2, true, "/", "-blockId", bIds[0]);
+      assertTrue(fsckOut.contains(NamenodeFsck.DECOMMISSIONED_STATUS));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test for blockIdCK with block corruption
+   */
+  @Test
+  public void testBlockIdCKCorruption() throws Exception {
+    short NUM_DN = 1;
+    final long blockSize = 512;
+    Random random = new Random();
+    DFSClient dfsClient;
+    LocatedBlocks blocks;
+    ExtendedBlock block;
+    short repFactor = 1;
+    String [] racks = {"/rack1"};
+    String [] hosts = {"host1"};
+
+    Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
+    // Set short retry timeouts so this test runs faster
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+
+    MiniDFSCluster cluster = null;
+    DistributedFileSystem dfs = null;
+    try {
+      cluster =
+          new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts)
+              .racks(racks).build();
+
+      assertNotNull("Failed Cluster Creation", cluster);
+      cluster.waitClusterUp();
+      dfs = cluster.getFileSystem();
+      assertNotNull("Failed to get FileSystem", dfs);
+
+      DFSTestUtil util = new DFSTestUtil.Builder().
+        setName(getClass().getSimpleName()).setNumFiles(1).build();
+      //create files
+      final String pathString = new String("/testfile");
+      final Path path = new Path(pathString);
+      util.createFile(dfs, path, 1024, repFactor, 1000L);
+      util.waitReplication(dfs, path, repFactor);
+      StringBuilder sb = new StringBuilder();
+      for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
+        sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
+      }
+      String[] bIds = sb.toString().split(" ");
+
+      //make sure block is healthy before we corrupt it
+      String outStr = runFsck(conf, 0, true, "/", "-blockId", bIds[0]);
+      System.out.println(outStr);
+      assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
+
+      // corrupt replicas
+      block = DFSTestUtil.getFirstBlock(dfs, path);
+      File blockFile = MiniDFSCluster.getBlockFile(0, block);
+      if (blockFile != null && blockFile.exists()) {
+        RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
+        FileChannel channel = raFile.getChannel();
+        String badString = "BADBAD";
+        int rand = random.nextInt((int) channel.size()/2);
+        raFile.seek(rand);
+        raFile.write(badString.getBytes());
+        raFile.close();
+      }
+
+      util.waitCorruptReplicas(dfs, cluster.getNamesystem(), path, block, 1);
+
+      outStr = runFsck(conf, 1, false, "/", "-blockId", block.getBlockName());
+      System.out.println(outStr);
+      assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }