You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2014/10/28 20:27:21 UTC
git commit: HDFS-6663. Admin command to track file and locations from
block id. Contributed by Chen He.
Repository: hadoop
Updated Branches:
refs/heads/trunk 0d3e7e2bd -> 371a3b87e
HDFS-6663. Admin command to track file and locations from block id.
Contributed by Chen He.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/371a3b87
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/371a3b87
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/371a3b87
Branch: refs/heads/trunk
Commit: 371a3b87ed346732ed58a4faab0c6c1db57c86ed
Parents: 0d3e7e2
Author: Kihwal Lee <ki...@apache.org>
Authored: Tue Oct 28 14:26:04 2014 -0500
Committer: Kihwal Lee <ki...@apache.org>
Committed: Tue Oct 28 14:26:04 2014 -0500
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../server/blockmanagement/BlockManager.java | 7 +
.../blockmanagement/CorruptReplicasMap.java | 23 +-
.../hdfs/server/namenode/NamenodeFsck.java | 106 +++++++-
.../org/apache/hadoop/hdfs/tools/DFSck.java | 21 +-
.../hadoop/hdfs/server/namenode/TestFsck.java | 249 ++++++++++++++++++-
6 files changed, 394 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/371a3b87/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 13b2e9f..f704efc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -262,6 +262,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7278. Add a command that allows sysadmins to manually trigger full
block reports from a DN (cmccabe)
+ HDFS-6663. Admin command to track file and locations from block id.
+ (Chen He via kihwal)
+
IMPROVEMENTS
HDFS-7055. Add tracing to DFSInputStream (cmccabe)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/371a3b87/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 98388d4..9b2ce90 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -3505,6 +3505,13 @@ public class BlockManager {
return corruptReplicas.getNodes(block);
}
+ /**
+ * Get reason for certain corrupted replicas for a given block and a given dn.
+ */
+ public String getCorruptReason(Block block, DatanodeDescriptor node) {
+ return corruptReplicas.getCorruptReason(block, node);
+ }
+
/** @return the size of UnderReplicatedBlocks */
public int numOfUnderReplicatedBlocks() {
return neededReplications.size();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/371a3b87/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
index 9b7515d..764f25d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
@@ -223,5 +223,26 @@ public class CorruptReplicasMap{
}
return ret;
- }
+ }
+
+ /**
+ * return the reason about corrupted replica for a given block
+ * on a given dn
+ * @param block block that has corrupted replica
+ * @param node datanode that contains this corrupted replica
+ * @return reason
+ */
+ String getCorruptReason(Block block, DatanodeDescriptor node) {
+ Reason reason = null;
+ if(corruptReplicasMap.containsKey(block)) {
+ if (corruptReplicasMap.get(block).containsKey(node)) {
+ reason = corruptReplicasMap.get(block).get(node);
+ }
+ }
+ if (reason != null) {
+ return reason.toString();
+ } else {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/371a3b87/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
index a187123..f82f0ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@@ -59,8 +60,12 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -103,6 +108,8 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
// return string marking fsck status
public static final String CORRUPT_STATUS = "is CORRUPT";
public static final String HEALTHY_STATUS = "is HEALTHY";
+ public static final String DECOMMISSIONING_STATUS = "is DECOMMISSIONING";
+ public static final String DECOMMISSIONED_STATUS = "is DECOMMISSIONED";
public static final String NONEXISTENT_STATUS = "does not exist";
public static final String FAILURE_STATUS = "FAILED";
@@ -143,7 +150,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
*/
private boolean doDelete = false;
- private String path = "/";
+ String path = "/";
+
+ private String blockIds = null;
// We return back N files that are corrupt; the list of files returned is
// ordered by block id; to allow continuation support, pass in the last block
@@ -195,21 +204,112 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
else if (key.equals("openforwrite")) {this.showOpenFiles = true; }
else if (key.equals("listcorruptfileblocks")) {
this.showCorruptFileBlocks = true;
- }
- else if (key.equals("startblockafter")) {
+ } else if (key.equals("startblockafter")) {
this.currentCookie[0] = pmap.get("startblockafter")[0];
} else if (key.equals("includeSnapshots")) {
this.snapshottableDirs = new ArrayList<String>();
+ } else if (key.equals("blockId")) {
+ this.blockIds = pmap.get("blockId")[0];
}
}
}
/**
+ * Check block information given a blockId number
+ *
+ */
+ public void blockIdCK(String blockId) {
+
+ if(blockId == null) {
+ out.println("Please provide valid blockId!");
+ return;
+ }
+
+ BlockManager bm = namenode.getNamesystem().getBlockManager();
+ try {
+ //get blockInfo
+ Block block = new Block(Block.getBlockId(blockId));
+ //find which file this block belongs to
+ BlockInfo blockInfo = bm.getStoredBlock(block);
+ if(blockInfo == null) {
+ out.println("Block "+ blockId +" " + NONEXISTENT_STATUS);
+ LOG.warn("Block "+ blockId + " " + NONEXISTENT_STATUS);
+ return;
+ }
+ BlockCollection bc = bm.getBlockCollection(blockInfo);
+ INode iNode = (INode) bc;
+ NumberReplicas numberReplicas= bm.countNodes(block);
+ out.println("Block Id: " + blockId);
+ out.println("Block belongs to: "+iNode.getFullPathName());
+ out.println("No. of Expected Replica: " + bc.getBlockReplication());
+ out.println("No. of live Replica: " + numberReplicas.liveReplicas());
+ out.println("No. of excess Replica: " + numberReplicas.excessReplicas());
+ out.println("No. of stale Replica: " + numberReplicas.replicasOnStaleNodes());
+ out.println("No. of decommission Replica: "
+ + numberReplicas.decommissionedReplicas());
+ out.println("No. of corrupted Replica: " + numberReplicas.corruptReplicas());
+ //record datanodes that have corrupted block replica
+ Collection<DatanodeDescriptor> corruptionRecord = null;
+ if (bm.getCorruptReplicas(block) != null) {
+ corruptionRecord = bm.getCorruptReplicas(block);
+ }
+
+ //report block replicas status on datanodes
+ for(int idx = (blockInfo.numNodes()-1); idx >= 0; idx--) {
+ DatanodeDescriptor dn = blockInfo.getDatanode(idx);
+ out.print("Block replica on datanode/rack: " + dn.getHostName() +
+ dn.getNetworkLocation() + " ");
+ if (corruptionRecord != null && corruptionRecord.contains(dn)) {
+ out.print(CORRUPT_STATUS+"\t ReasonCode: "+
+ bm.getCorruptReason(block,dn));
+ } else if (dn.isDecommissioned() ){
+ out.print(DECOMMISSIONED_STATUS);
+ } else if (dn.isDecommissionInProgress()) {
+ out.print(DECOMMISSIONING_STATUS);
+ } else {
+ out.print(HEALTHY_STATUS);
+ }
+ out.print("\n");
+ }
+ } catch (Exception e){
+ String errMsg = "Fsck on blockId '" + blockId;
+ LOG.warn(errMsg, e);
+ out.println(e.getMessage());
+ out.print("\n\n" + errMsg);
+ LOG.warn("Error in looking up block", e);
+ }
+ }
+
+ /**
* Check files on DFS, starting from the indicated path.
*/
public void fsck() {
final long startTime = Time.now();
try {
+ if(blockIds != null) {
+
+ String[] blocks = blockIds.split(" ");
+ StringBuilder sb = new StringBuilder();
+ sb.append("FSCK started by " +
+ UserGroupInformation.getCurrentUser() + " from " +
+ remoteAddress + " at " + new Date());
+ out.println(sb.toString());
+ sb.append(" for blockIds: \n");
+ for (String blk: blocks) {
+ if(blk == null || !blk.contains("blk_")) {
+ out.println("Incorrect blockId format: " + blk);
+ continue;
+ }
+ out.print("\n");
+ blockIdCK(blk);
+ sb.append(blk + "\n");
+ }
+ LOG.info(sb.toString());
+ namenode.getNamesystem().logFsckEvent("/", remoteAddress);
+ out.flush();
+ return;
+ }
+
String msg = "FSCK started by " + UserGroupInformation.getCurrentUser()
+ " from " + remoteAddress + " for path " + path + " at " + new Date();
LOG.info(msg);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/371a3b87/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java
index 31ab596..0d73b43 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java
@@ -92,7 +92,10 @@ public class DFSck extends Configured implements Tool {
+ "\t-blocks\tprint out block report\n"
+ "\t-locations\tprint out locations for every block\n"
+ "\t-racks\tprint out network topology for data-node locations\n"
- + "\t-showprogress\tshow progress in output. Default is OFF (no progress)\n\n"
+ + "\t-showprogress\tshow progress in output. Default is OFF (no progress)\n"
+ + "\t-blockId\tprint out which file this blockId belongs to, locations"
+ + " (nodes, racks) of this block, and other diagnostics info"
+ + " (under replicated, corrupted or not, etc)\n\n"
+ "Please Note:\n"
+ "\t1. By default fsck ignores files opened for write, "
+ "use -openforwrite to report such files. They are usually "
@@ -278,6 +281,15 @@ public class DFSck extends Configured implements Tool {
doListCorruptFileBlocks = true;
} else if (args[idx].equals("-includeSnapshots")) {
url.append("&includeSnapshots=1");
+ } else if (args[idx].equals("-blockId")) {
+ StringBuilder sb = new StringBuilder();
+ idx++;
+ while(idx < args.length && !args[idx].startsWith("-")){
+ sb.append(args[idx]);
+ sb.append(" ");
+ idx++;
+ }
+ url.append("&blockId=").append(URLEncoder.encode(sb.toString(), "UTF-8"));
} else if (!args[idx].startsWith("-")) {
if (null == dir) {
dir = args[idx];
@@ -287,6 +299,7 @@ public class DFSck extends Configured implements Tool {
printUsage(System.err);
return -1;
}
+
} else {
System.err.println("fsck: Illegal option '" + args[idx] + "'");
printUsage(System.err);
@@ -327,6 +340,12 @@ public class DFSck extends Configured implements Tool {
errCode = 1;
} else if (lastLine.endsWith(NamenodeFsck.NONEXISTENT_STATUS)) {
errCode = 0;
+ } else if (lastLine.contains("Incorrect blockId format:")) {
+ errCode = 0;
+ } else if (lastLine.endsWith(NamenodeFsck.DECOMMISSIONED_STATUS)) {
+ errCode = 2;
+ } else if (lastLine.endsWith(NamenodeFsck.DECOMMISSIONING_STATUS)) {
+ errCode = 3;
}
return errCode;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/371a3b87/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
index e16df16..ef7de0d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
@@ -18,13 +18,6 @@
package org.apache.hadoop.hdfs.server.namenode;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
@@ -48,6 +41,7 @@ import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.google.common.collect.Sets;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -66,11 +60,14 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.Result;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -86,8 +83,17 @@ import org.apache.log4j.PatternLayout;
import org.apache.log4j.RollingFileAppender;
import org.junit.Test;
-import com.google.common.collect.Sets;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* A JUnit test for doing fsck
@@ -118,7 +124,7 @@ public class TestFsck {
System.getProperty("line.separator");
static String runFsck(Configuration conf, int expectedErrCode,
- boolean checkErrorCode,String... path)
+ boolean checkErrorCode,String... path)
throws Exception {
ByteArrayOutputStream bStream = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bStream, true);
@@ -1096,4 +1102,227 @@ public class TestFsck {
cluster.shutdown();
}
}
+
+ /**
+ * Test for blockIdCK
+ */
+
+ @Test
+ public void testBlockIdCK() throws Exception {
+
+ final short REPL_FACTOR = 2;
+ short NUM_DN = 2;
+ final long blockSize = 512;
+
+ String [] racks = {"/rack1", "/rack2"};
+ String [] hosts = {"host1", "host2"};
+
+ Configuration conf = new Configuration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
+
+ MiniDFSCluster cluster = null;
+ DistributedFileSystem dfs = null;
+ cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts)
+ .racks(racks).build();
+
+ assertNotNull("Failed Cluster Creation", cluster);
+ cluster.waitClusterUp();
+ dfs = cluster.getFileSystem();
+ assertNotNull("Failed to get FileSystem", dfs);
+
+ DFSTestUtil util = new DFSTestUtil.Builder().
+ setName(getClass().getSimpleName()).setNumFiles(1).build();
+ //create files
+ final String pathString = new String("/testfile");
+ final Path path = new Path(pathString);
+ util.createFile(dfs, path, 1024, REPL_FACTOR , 1000L);
+ util.waitReplication(dfs, path, REPL_FACTOR);
+ StringBuilder sb = new StringBuilder();
+ for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
+ sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
+ }
+ String[] bIds = sb.toString().split(" ");
+
+ //run fsck
+ try {
+ //illegal input test
+ String runFsckResult = runFsck(conf, 0, true, "/", "-blockId",
+ "not_a_block_id");
+ assertTrue(runFsckResult.contains("Incorrect blockId format:"));
+
+ //general test
+ runFsckResult = runFsck(conf, 0, true, "/", "-blockId", sb.toString());
+ assertTrue(runFsckResult.contains(bIds[0]));
+ assertTrue(runFsckResult.contains(bIds[1]));
+ assertTrue(runFsckResult.contains(
+ "Block replica on datanode/rack: host1/rack1 is HEALTHY"));
+ assertTrue(runFsckResult.contains(
+ "Block replica on datanode/rack: host2/rack2 is HEALTHY"));
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test for blockIdCK with datanode decommission
+ */
+ @Test
+ public void testBlockIdCKDecommission() throws Exception {
+
+ final short REPL_FACTOR = 1;
+ short NUM_DN = 2;
+ final long blockSize = 512;
+ boolean checkDecommissionInProgress = false;
+ String [] racks = {"/rack1", "/rack2"};
+ String [] hosts = {"host1", "host2"};
+
+ Configuration conf = new Configuration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
+
+ MiniDFSCluster cluster;
+ DistributedFileSystem dfs ;
+ cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts)
+ .racks(racks).build();
+
+ assertNotNull("Failed Cluster Creation", cluster);
+ cluster.waitClusterUp();
+ dfs = cluster.getFileSystem();
+ assertNotNull("Failed to get FileSystem", dfs);
+
+ DFSTestUtil util = new DFSTestUtil.Builder().
+ setName(getClass().getSimpleName()).setNumFiles(1).build();
+ //create files
+ final String pathString = new String("/testfile");
+ final Path path = new Path(pathString);
+ util.createFile(dfs, path, 1024, REPL_FACTOR, 1000L);
+ util.waitReplication(dfs, path, REPL_FACTOR);
+ StringBuilder sb = new StringBuilder();
+ for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
+ sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
+ }
+ String[] bIds = sb.toString().split(" ");
+ try {
+ //make sure datanode that has replica is fine before decommission
+ String outStr = runFsck(conf, 0, true, "/", "-blockId", bIds[0]);
+ System.out.println(outStr);
+ assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
+
+ //decommission datanode
+ ExtendedBlock eb = util.getFirstBlock(dfs, path);
+ DatanodeDescriptor dn = cluster.getNameNode().getNamesystem()
+ .getBlockManager().getBlockCollection(eb.getLocalBlock())
+ .getBlocks()[0].getDatanode(0);
+ cluster.getNameNode().getNamesystem().getBlockManager()
+ .getDatanodeManager().startDecommission(dn);
+ String dnName = dn.getXferAddr();
+
+ //wait for decommission start
+ DatanodeInfo datanodeInfo = null;
+ int count = 0;
+ do {
+ Thread.sleep(2000);
+ for (DatanodeInfo info : dfs.getDataNodeStats()) {
+ if (dnName.equals(info.getXferAddr())) {
+ datanodeInfo = info;
+ }
+ }
+ //check decommissioning only once
+ if(!checkDecommissionInProgress && datanodeInfo != null
+ && datanodeInfo.isDecommissionInProgress()) {
+ String fsckOut = runFsck(conf, 3, true, "/", "-blockId", bIds[0]);
+ assertTrue(fsckOut.contains(NamenodeFsck.DECOMMISSIONING_STATUS));
+ checkDecommissionInProgress = true;
+ }
+ } while (datanodeInfo != null && !datanodeInfo.isDecommissioned());
+
+ //check decommissioned
+ String fsckOut = runFsck(conf, 2, true, "/", "-blockId", bIds[0]);
+ assertTrue(fsckOut.contains(NamenodeFsck.DECOMMISSIONED_STATUS));
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Test for blockIdCK with block corruption
+ */
+ @Test
+ public void testBlockIdCKCorruption() throws Exception {
+ short NUM_DN = 1;
+ final long blockSize = 512;
+ Random random = new Random();
+ DFSClient dfsClient;
+ LocatedBlocks blocks;
+ ExtendedBlock block;
+ short repFactor = 1;
+ String [] racks = {"/rack1"};
+ String [] hosts = {"host1"};
+
+ Configuration conf = new Configuration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
+ // Set short retry timeouts so this test runs faster
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+
+ MiniDFSCluster cluster = null;
+ DistributedFileSystem dfs = null;
+ try {
+ cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts)
+ .racks(racks).build();
+
+ assertNotNull("Failed Cluster Creation", cluster);
+ cluster.waitClusterUp();
+ dfs = cluster.getFileSystem();
+ assertNotNull("Failed to get FileSystem", dfs);
+
+ DFSTestUtil util = new DFSTestUtil.Builder().
+ setName(getClass().getSimpleName()).setNumFiles(1).build();
+ //create files
+ final String pathString = new String("/testfile");
+ final Path path = new Path(pathString);
+ util.createFile(dfs, path, 1024, repFactor, 1000L);
+ util.waitReplication(dfs, path, repFactor);
+ StringBuilder sb = new StringBuilder();
+ for (LocatedBlock lb: util.getAllBlocks(dfs, path)){
+ sb.append(lb.getBlock().getLocalBlock().getBlockName()+" ");
+ }
+ String[] bIds = sb.toString().split(" ");
+
+ //make sure block is healthy before we corrupt it
+ String outStr = runFsck(conf, 0, true, "/", "-blockId", bIds[0]);
+ System.out.println(outStr);
+ assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
+
+ // corrupt replicas
+ block = DFSTestUtil.getFirstBlock(dfs, path);
+ File blockFile = MiniDFSCluster.getBlockFile(0, block);
+ if (blockFile != null && blockFile.exists()) {
+ RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
+ FileChannel channel = raFile.getChannel();
+ String badString = "BADBAD";
+ int rand = random.nextInt((int) channel.size()/2);
+ raFile.seek(rand);
+ raFile.write(badString.getBytes());
+ raFile.close();
+ }
+
+ util.waitCorruptReplicas(dfs, cluster.getNamesystem(), path, block, 1);
+
+ outStr = runFsck(conf, 1, false, "/", "-blockId", block.getBlockName());
+ System.out.println(outStr);
+ assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
}