You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2011/04/22 19:10:44 UTC
svn commit: r1095963 - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/server/namenode/
src/test/hdfs/org/apache/hadoop/hdfs/
src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Author: eli
Date: Fri Apr 22 17:10:43 2011
New Revision: 1095963
URL: http://svn.apache.org/viewvc?rev=1095963&view=rev
Log:
HDFS-1562. Add rack policy tests. Contributed by Eli Collins
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Fri Apr 22 17:10:43 2011
@@ -130,6 +130,8 @@ Trunk (unreleased changes)
HDFS-1854. make failure message more useful in
DFSTestUtil.waitReplication(). (Matt Foley via eli)
+ HDFS-1562. Add rack policy tests. (eli)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Fri Apr 22 17:10:43 2011
@@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
/**
* Keeps information related to the blocks stored in the Hadoop cluster.
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java Fri Apr 22 17:10:43 2011
@@ -86,9 +86,8 @@ class UnderReplicatedBlocks implements I
int curReplicas,
int decommissionedReplicas,
int expectedReplicas) {
- if (curReplicas<0) {
- return LEVEL;
- } else if (curReplicas>=expectedReplicas) {
+ assert curReplicas >= 0 : "Negative replicas!";
+ if (curReplicas >= expectedReplicas) {
return 3; // Block doesn't have enough racks
} else if(curReplicas==0) {
// If there are zero non-decommissioned replica but there are
@@ -116,9 +115,7 @@ class UnderReplicatedBlocks implements I
int curReplicas,
int decomissionedReplicas,
int expectedReplicas) {
- if(curReplicas<0) {
- return false;
- }
+ assert curReplicas >= 0 : "Negative replicas!";
int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
expectedReplicas);
if(priLevel != LEVEL && priorityQueues.get(priLevel).add(block)) {
@@ -159,8 +156,10 @@ class UnderReplicatedBlocks implements I
}
return true;
} else {
+ // Try to remove the block from all queues if the block was
+ // not found in the queue for the given priority level.
for(int i=0; i<LEVEL; i++) {
- if(i!=priLevel && priorityQueues.get(i).remove(block)) {
+ if(priorityQueues.get(i).remove(block)) {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.remove: "
@@ -202,7 +201,7 @@ class UnderReplicatedBlocks implements I
"BLOCK* NameSystem.UnderReplicationBlock.update:"
+ block
+ " has only "+curReplicas
- + " replicas and need " + curExpectedReplicas
+ + " replicas and needs " + curExpectedReplicas
+ " replicas so is added to neededReplications"
+ " at priority level " + curPri);
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Fri Apr 22 17:10:43 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -27,6 +28,8 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URL;
import java.net.URLConnection;
@@ -36,6 +39,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@@ -50,15 +54,19 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.junit.Assert;
+
+import static org.junit.Assert.*;
/** Utilities for HDFS tests */
public class DFSTestUtil {
@@ -143,6 +151,12 @@ public class DFSTestUtil {
replicationFactor, files[idx].getSeed());
}
}
+
+ public static String readFile(FileSystem fs, Path fileName) throws IOException {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ IOUtils.copyBytes(fs.open(fileName), os, 1024, true);
+ return os.toString();
+ }
public static void createFile(FileSystem fs, Path fileName, long fileLen,
short replFactor, long seed) throws IOException {
@@ -172,8 +186,6 @@ public class DFSTestUtil {
/** check if the files have been copied correctly. */
public boolean checkFiles(FileSystem fs, String topdir) throws IOException {
-
- //Configuration conf = new HdfsConfiguration();
Path root = new Path(topdir);
for (int idx = 0; idx < nFiles; idx++) {
@@ -206,9 +218,10 @@ public class DFSTestUtil {
}
}
- // waits for the replication factor of all files to reach the
- // specified target
- //
+ /*
+ * Waits for the replication factor of all files to reach the
+ * specified target.
+ */
public void waitReplication(FileSystem fs, String topdir, short value)
throws IOException {
Path root = new Path(topdir);
@@ -219,6 +232,128 @@ public class DFSTestUtil {
}
}
+ /*
+ * Check if the given block in the given file is corrupt.
+ */
+ public static boolean allBlockReplicasCorrupt(MiniDFSCluster cluster,
+ Path file, int blockNo) throws IOException {
+ DFSClient client = new DFSClient(new InetSocketAddress("localhost",
+ cluster.getNameNodePort()), cluster.getConfiguration());
+ LocatedBlocks blocks;
+ try {
+ blocks = client.getNamenode().getBlockLocations(
+ file.toString(), 0, Long.MAX_VALUE);
+ } finally {
+ client.close();
+ }
+ return blocks.get(blockNo).isCorrupt();
+ }
+
+ /*
+ * Wait up to 20s for the given block to be replicated across
+ * the requested number of racks, with the requested number of
+ * replicas, and the requested number of replicas still needed.
+ */
+ public static void waitForReplication(MiniDFSCluster cluster, Block b,
+ int racks, int replicas, int neededReplicas)
+ throws IOException, TimeoutException, InterruptedException {
+ int curRacks = 0;
+ int curReplicas = 0;
+ int curNeededReplicas = 0;
+ int count = 0;
+ final int ATTEMPTS = 20;
+
+ do {
+ Thread.sleep(1000);
+ int []r = NameNodeAdapter.getReplicaInfo(cluster.getNameNode(), b);
+ curRacks = r[0];
+ curReplicas = r[1];
+ curNeededReplicas = r[2];
+ count++;
+ } while ((curRacks != racks ||
+ curReplicas != replicas ||
+ curNeededReplicas != neededReplicas) && count < ATTEMPTS);
+
+ if (count == ATTEMPTS) {
+ throw new TimeoutException("Timed out waiting for replication."
+ + " Needed replicas = "+neededReplicas
+ + " Cur needed replicas = "+curNeededReplicas
+ + " Replicas = "+replicas+" Cur replicas = "+curReplicas
+ + " Racks = "+racks+" Cur racks = "+curRacks);
+ }
+ }
+
+ /*
+ * Keep accessing the given file until the namenode reports that the
+ * given block in the file contains the given number of corrupt replicas.
+ */
+ public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns,
+ Path file, Block b, int corruptRepls)
+ throws IOException, TimeoutException {
+ int count = 0;
+ final int ATTEMPTS = 20;
+ int repls = ns.numCorruptReplicas(b);
+ while (repls != corruptRepls && count < ATTEMPTS) {
+ try {
+ IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(),
+ 512, true);
+ } catch (IOException e) {
+ // Swallow exceptions
+ }
+ System.out.println("Waiting for "+corruptRepls+" corrupt replicas");
+ repls = ns.numCorruptReplicas(b);
+ count++;
+ }
+ if (count == ATTEMPTS) {
+ throw new TimeoutException("Timed out waiting for corrupt replicas."
+ + " Waiting for "+corruptRepls+", but only found "+repls);
+ }
+ }
+
+ /*
+ * Wait up to 20s for the given DN (host:port) to be decommissioned.
+ */
+ public static void waitForDecommission(FileSystem fs, String name)
+ throws IOException, InterruptedException, TimeoutException {
+ DatanodeInfo dn = null;
+ int count = 0;
+ final int ATTEMPTS = 20;
+
+ do {
+ Thread.sleep(1000);
+ DistributedFileSystem dfs = (DistributedFileSystem)fs;
+ for (DatanodeInfo info : dfs.getDataNodeStats()) {
+ if (name.equals(info.getName())) {
+ dn = info;
+ }
+ }
+ count++;
+ } while ((dn == null ||
+ dn.isDecommissionInProgress() ||
+ !dn.isDecommissioned()) && count < ATTEMPTS);
+
+ if (count == ATTEMPTS) {
+ throw new TimeoutException("Timed out waiting for datanode "
+ + name + " to decommission.");
+ }
+ }
+
+ /*
+ * Returns the index of the first datanode which has a copy
+ * of the given block, or -1 if no such datanode exists.
+ */
+ public static int firstDnWithBlock(MiniDFSCluster cluster, Block b)
+ throws IOException {
+ int numDatanodes = cluster.getDataNodes().size();
+ for (int i = 0; i < numDatanodes; i++) {
+ String blockContent = cluster.readBlockOnDataNode(i, b.getBlockName());
+ if (blockContent != null) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
/** return list of filenames created as part of createFiles */
public String[] getFileNames(String topDir) {
if (nFiles == 0)
@@ -292,7 +427,7 @@ public class DFSTestUtil {
).getLogger().setLevel(org.apache.log4j.Level.ALL);
}
- static String readFile(File f) throws IOException {
+ public static String readFile(File f) throws IOException {
StringBuilder b = new StringBuilder();
BufferedReader in = new BufferedReader(new FileReader(f));
for(int c; (c = in.read()) != -1; b.append((char)c));
@@ -300,6 +435,17 @@ public class DFSTestUtil {
return b.toString();
}
+ /* Write the given string to the given file */
+ public static void writeFile(FileSystem fs, Path p, String s)
+ throws IOException {
+ if (fs.exists(p)) {
+ fs.delete(p, true);
+ }
+ InputStream is = new ByteArrayInputStream(s.getBytes());
+ FSDataOutputStream os = fs.create(p);
+ IOUtils.copyBytes(is, os, s.length(), true);
+ }
+
// Returns url content as string.
public static String urlGet(URL url) throws IOException {
URLConnection conn = url.openConnection();
@@ -409,7 +555,7 @@ public class DFSTestUtil {
/** For {@link TestTransferRbw} */
public static DataTransferProtocol.Status transferRbw(final Block b,
final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
- Assert.assertEquals(2, datanodes.length);
+ assertEquals(2, datanodes.length);
final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0],
datanodes.length, dfsClient);
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Apr 22 17:10:43 2011
@@ -878,29 +878,69 @@ public class MiniDFSCluster {
System.out.println("Cluster is active");
}
- /*
- * Corrupt a block on all datanode
+ /**
+ * Return the contents of the given block on the given datanode.
+ *
+ * @param The index of the datanode
+ * @param The name of the block
+ * @throws IOException on error accessing the file for the given block
+ * @return The contents of the block file, null if none found
*/
- void corruptBlockOnDataNodes(String blockName) throws Exception{
- for (int i=0; i < dataNodes.size(); i++)
- corruptBlockOnDataNode(i,blockName);
+ public String readBlockOnDataNode(int i, String blockName) throws IOException {
+ assert (i >= 0 && i < dataNodes.size()) : "Invalid datanode "+i;
+
+ // Each datanode has multiple data dirs, check each
+ for (int dn = i*2; dn < i*2+2; dn++) {
+ File dataDir = new File(getBaseDirectory() + "data");
+ File blockFile = new File(dataDir,
+ "data" + (dn+1) + FINALIZED_DIR_NAME + blockName);
+ if (blockFile.exists()) {
+ return DFSTestUtil.readFile(blockFile);
+ }
+ }
+ return null;
}
- /*
- * Corrupt a block on a particular datanode
+ /**
+ * Corrupt a block on all datanodes.
+ *
+ * @param The name of the block
+ * @throws IOException on error accessing the given block.
+ * @return The number of block files corrupted.
+ */
+ public int corruptBlockOnDataNodes(String blockName) throws IOException {
+ int blocksCorrupted = 0;
+ for (int i=0; i < dataNodes.size(); i++) {
+ if (corruptReplica(blockName, i)) {
+ blocksCorrupted++;
+ }
+ }
+ return blocksCorrupted;
+ }
+
+ /**
+ * Corrupt a block on a particular datanode.
+ *
+ * @param The index of the datanode
+ * @param The name of the block
+ * @throws IOException on error accessing the given block or if
+ * the contents of the block (on the same datanode) differ.
+ * @return true if a replica was corrupted, false otherwise
*/
- boolean corruptBlockOnDataNode(int i, String blockName) throws Exception {
+ public boolean corruptReplica(String blockName, int i) throws IOException {
Random random = new Random();
- boolean corrupted = false;
- File dataDir = new File(getBaseDirectory() + "data");
- if (i < 0 || i >= dataNodes.size())
- return false;
+ assert (i >= 0 && i < dataNodes.size()) : "Invalid datanode "+i;
+ int filesCorrupted = 0;
+
+ // Each datanode has multiple data dirs, check each
for (int dn = i*2; dn < i*2+2; dn++) {
- File blockFile = new File(dataDir, "data" + (dn+1) + FINALIZED_DIR_NAME +
- blockName);
- System.out.println("Corrupting for: " + blockFile);
- if (blockFile.exists()) {
- // Corrupt replica by writing random bytes into replica
+ File dataDir = new File(getBaseDirectory() + "data");
+ File blockFile = new File(dataDir,
+ "data" + (dn+1) + FINALIZED_DIR_NAME + blockName);
+
+ // Corrupt the replica by writing some bytes into a random offset
+ if (blockFile.exists()) {
+ System.out.println("Corrupting " + blockFile);
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
FileChannel channel = raFile.getChannel();
String badString = "BADBAD";
@@ -908,10 +948,12 @@ public class MiniDFSCluster {
raFile.seek(rand);
raFile.write(badString.getBytes());
raFile.close();
+ filesCorrupted++;
}
- corrupted = true;
}
- return corrupted;
+ assert filesCorrupted == 0 || filesCorrupted == 1
+ : "Unexpected # block files";
+ return filesCorrupted == 1;
}
/*
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java Fri Apr 22 17:10:43 2011
@@ -244,7 +244,7 @@ public class TestCrcCorruption {
private void doTestEntirelyCorruptFile(int numDataNodes) throws Exception {
long fileSize = 4096;
Path file = new Path("/testFile");
-
+ short replFactor = (short)numDataNodes;
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
@@ -253,11 +253,12 @@ public class TestCrcCorruption {
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
- DFSTestUtil.createFile(fs, file, fileSize, (short)numDataNodes, 12345L /*seed*/);
- DFSTestUtil.waitReplication(fs, file, (short)numDataNodes);
+ DFSTestUtil.createFile(fs, file, fileSize, replFactor, 12345L /*seed*/);
+ DFSTestUtil.waitReplication(fs, file, replFactor);
String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
- cluster.corruptBlockOnDataNodes(block);
+ int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
+ assertEquals("All replicas not corrupted", replFactor, blockFilesCorrupted);
try {
IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(), conf,
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java Fri Apr 22 17:10:43 2011
@@ -24,7 +24,6 @@ import java.net.URL;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.io.*;
-import java.nio.channels.FileChannel;
import java.util.Random;
import org.apache.commons.logging.Log;
@@ -32,7 +31,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -142,36 +140,11 @@ public class TestDatanodeBlockScanner ex
cluster.shutdown();
}
- public static boolean corruptReplica(String blockName, int replica) throws IOException {
- Random random = new Random();
- File baseDir = new File(MiniDFSCluster.getBaseDirectory(), "data");
- boolean corrupted = false;
- for (int i=replica*2; i<replica*2+2; i++) {
- File blockFile = new File(baseDir, "data" + (i+1) +
- MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
- if (blockFile.exists()) {
- // Corrupt replica by writing random bytes into replica
- RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
- FileChannel channel = raFile.getChannel();
- String badString = "BADBAD";
- int rand = random.nextInt((int)channel.size()/2);
- raFile.seek(rand);
- raFile.write(badString.getBytes());
- raFile.close();
- corrupted = true;
- }
- }
- return corrupted;
- }
-
- public void testBlockCorruptionPolicy() throws IOException {
+ public void testBlockCorruptionPolicy() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
Random random = new Random();
FileSystem fs = null;
- DFSClient dfsClient = null;
- LocatedBlocks blocks = null;
- int blockCount = 0;
int rand = random.nextInt(3);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
@@ -181,44 +154,24 @@ public class TestDatanodeBlockScanner ex
DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0);
String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
- dfsClient = new DFSClient(new InetSocketAddress("localhost",
- cluster.getNameNodePort()), conf);
- do {
- blocks = dfsClient.getNamenode().
- getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
- blockCount = blocks.get(0).getLocations().length;
- try {
- LOG.info("Looping until expected blockCount of 3 is received: " + blockCount);
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- } while (blockCount != 3);
- assertTrue(blocks.get(0).isCorrupt() == false);
+ DFSTestUtil.waitReplication(fs, file1, (short)3);
+ assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
// Corrupt random replica of block
- assertTrue(corruptReplica(block, rand));
+ assertTrue(cluster.corruptReplica(block, rand));
// Restart the datanode hoping the corrupt block to be reported
cluster.restartDataNode(rand);
// We have 2 good replicas and block is not corrupt
- do {
- blocks = dfsClient.getNamenode().
- getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
- blockCount = blocks.get(0).getLocations().length;
- try {
- LOG.info("Looping until expected blockCount of 2 is received: " + blockCount);
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- } while (blockCount != 2);
- assertTrue(blocks.get(0).isCorrupt() == false);
+ DFSTestUtil.waitReplication(fs, file1, (short)2);
+ assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
// Corrupt all replicas. Now, block should be marked as corrupt
// and we should get all the replicas
- assertTrue(corruptReplica(block, 0));
- assertTrue(corruptReplica(block, 1));
- assertTrue(corruptReplica(block, 2));
+ assertTrue(cluster.corruptReplica(block, 0));
+ assertTrue(cluster.corruptReplica(block, 1));
+ assertTrue(cluster.corruptReplica(block, 2));
// Read the file to trigger reportBadBlocks by client
try {
@@ -230,18 +183,8 @@ public class TestDatanodeBlockScanner ex
// We now have the blocks to be marked as corrupt and we get back all
// its replicas
- do {
- blocks = dfsClient.getNamenode().
- getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
- blockCount = blocks.get(0).getLocations().length;
- try {
- LOG.info("Looping until expected blockCount of 3 is received");
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- } while (blockCount != 3);
- assertTrue(blocks.get(0).isCorrupt() == true);
-
+ DFSTestUtil.waitReplication(fs, file1, (short)3);
+ assertTrue(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
cluster.shutdown();
}
@@ -261,7 +204,7 @@ public class TestDatanodeBlockScanner ex
* 4. Test again waits until the block is reported with expected number
* of good replicas.
*/
- public void testBlockCorruptionRecoveryPolicy() throws IOException {
+ public void testBlockCorruptionRecoveryPolicy() throws Exception {
// Test recovery of 1 corrupt replica
LOG.info("Testing corrupt replica recovery for one corrupt replica");
blockCorruptionRecoveryPolicy(4, (short)3, 1);
@@ -274,50 +217,30 @@ public class TestDatanodeBlockScanner ex
private void blockCorruptionRecoveryPolicy(int numDataNodes,
short numReplicas,
int numCorruptReplicas)
- throws IOException {
+ throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 30L);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 30);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30L);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
- FileSystem fs = null;
- DFSClient dfsClient = null;
- LocatedBlocks blocks = null;
- int replicaCount = 0;
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
cluster.waitActive();
- fs = cluster.getFileSystem();
+ FileSystem fs = cluster.getFileSystem();
Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
Block blk = DFSTestUtil.getFirstBlock(fs, file1);
String block = blk.getBlockName();
-
- dfsClient = new DFSClient(new InetSocketAddress("localhost",
- cluster.getNameNodePort()), conf);
- blocks = dfsClient.getNamenode().
- getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
- replicaCount = blocks.get(0).getLocations().length;
// Wait until block is replicated to numReplicas
- while (replicaCount != numReplicas) {
- try {
- LOG.info("Looping until expected replicaCount of " + numReplicas +
- "is reached");
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- blocks = dfsClient.getNamenode().
- getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
- replicaCount = blocks.get(0).getLocations().length;
- }
- assertTrue(blocks.get(0).isCorrupt() == false);
+ DFSTestUtil.waitReplication(fs, file1, numReplicas);
// Corrupt numCorruptReplicas replicas of block
int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
- if (corruptReplica(block, i))
+ if (cluster.corruptReplica(block, i)) {
corruptReplicasDNIDs[j++] = i;
+ }
}
// Restart the datanodes containing corrupt replicas
@@ -331,61 +254,19 @@ public class TestDatanodeBlockScanner ex
+ cluster.getDataNodes().get(corruptReplicasDNIDs[i]).getSelfAddr());
cluster.restartDataNode(corruptReplicasDNIDs[i]);
}
-
+
// Loop until all corrupt replicas are reported
- int corruptReplicaSize = cluster.getNamesystem().
- numCorruptReplicas(blk);
- while (corruptReplicaSize != numCorruptReplicas) {
- try {
- IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(),
- conf, true);
- } catch (IOException e) {
- }
- try {
- LOG.info("Looping until expected " + numCorruptReplicas + " are " +
- "reported. Current reported " + corruptReplicaSize);
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- corruptReplicaSize = cluster.getNamesystem().
- numCorruptReplicas(blk);
- }
+ DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1,
+ blk, numCorruptReplicas);
// Loop until the block recovers after replication
- blocks = dfsClient.getNamenode().
- getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
- replicaCount = blocks.get(0).getLocations().length;
- while (replicaCount != numReplicas) {
- try {
- LOG.info("Looping until block gets rereplicated to " + numReplicas);
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- blocks = dfsClient.getNamenode().
- getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
- replicaCount = blocks.get(0).getLocations().length;
- }
+ DFSTestUtil.waitReplication(fs, file1, numReplicas);
+ assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
// Make sure the corrupt replica is invalidated and removed from
// corruptReplicasMap
- corruptReplicaSize = cluster.getNamesystem().
- numCorruptReplicas(blk);
- while (corruptReplicaSize != 0 || replicaCount != numReplicas) {
- try {
- LOG.info("Looping until corrupt replica is invalidated");
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- corruptReplicaSize = cluster.getNamesystem().
- numCorruptReplicas(blk);
- blocks = dfsClient.getNamenode().
- getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
- replicaCount = blocks.get(0).getLocations().length;
- }
- // Make sure block is healthy
- assertTrue(corruptReplicaSize == 0);
- assertTrue(replicaCount == numReplicas);
- assertTrue(blocks.get(0).isCorrupt() == false);
+ DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1,
+ blk, 0);
cluster.shutdown();
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java Fri Apr 22 17:10:43 2011
@@ -68,7 +68,7 @@ public class TestMissingBlocksAlert exte
// Corrupt the block
String block = DFSTestUtil.getFirstBlock(dfs, corruptFile).getBlockName();
- TestDatanodeBlockScanner.corruptReplica(block, 0);
+ assertTrue(cluster.corruptReplica(block, 0));
// read the file so that the corrupt block is reported to NN
FSDataInputStream in = dfs.open(corruptFile);
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java Fri Apr 22 17:10:43 2011
@@ -151,6 +151,7 @@ public class TestReplication extends Tes
DFSClient dfsClient = null;
LocatedBlocks blocks = null;
int replicaCount = 0;
+ short replFactor = 1;
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitActive();
fs = cluster.getFileSystem();
@@ -159,16 +160,19 @@ public class TestReplication extends Tes
// Create file with replication factor of 1
Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
- DFSTestUtil.createFile(fs, file1, 1024, (short)1, 0);
- DFSTestUtil.waitReplication(fs, file1, (short)1);
+ DFSTestUtil.createFile(fs, file1, 1024, replFactor, 0);
+ DFSTestUtil.waitReplication(fs, file1, replFactor);
// Corrupt the block belonging to the created file
String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
- cluster.corruptBlockOnDataNodes(block);
-
+
+ int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
+ assertEquals("Corrupted too few blocks", replFactor, blockFilesCorrupted);
+
// Increase replication factor, this should invoke transfer request
// Receiving datanode fails on checksum and reports it to namenode
- fs.setReplication(file1, (short)2);
+ replFactor = 2;
+ fs.setReplication(file1, replFactor);
// Now get block details and check if the block is corrupt
blocks = dfsClient.getNamenode().
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Fri Apr 22 17:10:43 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
import java.io.IOException;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
/**
@@ -57,4 +58,19 @@ public class NameNodeAdapter {
public static Server getRpcServer(NameNode namenode) {
return namenode.server;
}
+
+ /**
+ * Return a tuple of the replica state (number racks, number live
+ * replicas, and number needed replicas) for the given block.
+ * @param namenode to proxy the invocation to.
+ */
+ public static int[] getReplicaInfo(NameNode namenode, Block b) {
+ FSNamesystem ns = namenode.getNamesystem();
+ ns.readLock();
+ int[] r = {ns.blockManager.getNumberOfRacks(b),
+ ns.blockManager.countNodes(b).liveReplicas(),
+ ns.blockManager.neededReplications.contains(b) ? 1 : 0};
+ ns.readUnlock();
+ return r;
+ }
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java Fri Apr 22 17:10:43 2011
@@ -18,141 +18,458 @@
package org.apache.hadoop.hdfs.server.namenode;
+import java.util.ArrayList;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.log4j.Level;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-
-import junit.framework.TestCase;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
-public class TestBlocksWithNotEnoughRacks extends TestCase {
+import static org.junit.Assert.*;
+import org.junit.Test;
+public class TestBlocksWithNotEnoughRacks {
+ public static final Log LOG = LogFactory.getLog(TestBlocksWithNotEnoughRacks.class);
static {
- ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL) ;
+ ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
}
- private static final Log LOG =
- LogFactory.getLog(TestBlocksWithNotEnoughRacks.class.getName());
- //Creates a block with all datanodes on same rack
- //Adds additional datanode on a different rack
- //The block should be replicated to the new rack
- public void testSufficientlyReplicatedBlocksWithNotEnoughRacks() throws Exception {
+ /*
+ * Return a configuration object with low timeouts for testing and
+ * a topology script set (which enables rack awareness).
+ */
+ private Configuration getConf() {
Configuration conf = new HdfsConfiguration();
+
+ // Lower the heart beat interval so the NN quickly learns of dead
+ // or decommissioned DNs and the NN issues replication and invalidation
+ // commands quickly (as replies to heartbeats)
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+
+ // Have the NN ReplicationMonitor compute the replication and
+ // invalidation commands to send DNs every second.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+
+ // Have the NN check for pending replications every second so it
+ // quickly schedules additional replicas as they are identified.
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
+
+ // The DNs report blocks every second.
+ conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
+
+ // Indicates we have multiple racks
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "xyz");
+ return conf;
+ }
+
+ /*
+ * Creates a block with all datanodes on the same rack, though the block
+ * is sufficiently replicated. Adds an additional datanode on a new rack.
+ * The block should be replicated to the new rack.
+ */
+ @Test
+ public void testSufficientlyReplBlocksUsesNewRack() throws Exception {
+ Configuration conf = getConf();
final short REPLICATION_FACTOR = 3;
- final String FILE_NAME = "/testFile";
- final Path FILE_PATH = new Path(FILE_NAME);
- //All datanodes are on the same rack
- String racks[] = {"/rack1","/rack1","/rack1",} ;
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR).racks(racks).build();
+ final Path filePath = new Path("/testFile");
+ // All datanodes are on the same rack
+ String racks[] = {"/rack1", "/rack1", "/rack1"};
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(racks.length).racks(racks).build();
+
+ try {
+ // Create a file with one block with a replication factor of 3
+ final FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+ Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+ DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
+
+ // Add a new datanode on a different rack
+ String newRacks[] = {"/rack2"};
+ cluster.startDataNodes(conf, 1, true, null, newRacks);
+ cluster.waitActive();
+
+ DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /*
+ * Like the previous test but the block starts with a single replica,
+ * and therefore unlike the previous test the block does not start
+ * off needing replicas.
+ */
+ @Test
+ public void testSufficientlySingleReplBlockUsesNewRack() throws Exception {
+ Configuration conf = getConf();
+ short REPLICATION_FACTOR = 1;
+ final Path filePath = new Path("/testFile");
+
+ String racks[] = {"/rack1", "/rack1", "/rack1", "/rack2"};
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(racks.length).racks(racks).build();
+ final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
+ try {
+ // Create a file with one block with a replication factor of 1
+ final FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+ Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+ DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
+
+ REPLICATION_FACTOR = 2;
+ ns.setReplication("/testFile", REPLICATION_FACTOR);
+ DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /*
+ * Creates a block with all datanodes on the same rack. Add additional
+ * datanodes on a different rack and increase the replication factor,
+ * making sure there are enough replicas across racks. If the previous
+ * test passes this one should too, however this test may pass when
+ * the previous one fails because the replication code is explicitly
+ * triggered by setting the replication factor.
+ */
+ @Test
+ public void testUnderReplicatedUsesNewRacks() throws Exception {
+ Configuration conf = getConf();
+ short REPLICATION_FACTOR = 3;
+ final Path filePath = new Path("/testFile");
+ // All datanodes are on the same rack
+ String racks[] = {"/rack1", "/rack1", "/rack1", "/rack1", "/rack1"};
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(racks.length).racks(racks).build();
+ final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
try {
- // create a file with one block with a replication factor of 3
+ // Create a file with one block
final FileSystem fs = cluster.getFileSystem();
- DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
- DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
+ DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+ Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+ DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
- Block b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
- final FSNamesystem namesystem = cluster.getNamesystem();
- int numRacks = namesystem.blockManager.getNumberOfRacks(b);
- NumberReplicas number = namesystem.blockManager.countNodes(b);
- int curReplicas = number.liveReplicas();
- int neededReplicationSize =
- namesystem.blockManager.neededReplications.size();
+ // Add new datanodes on a different rack and increase the
+ // replication factor so the block is underreplicated and make
+ // sure at least one of the hosts on the new rack is used.
+ String newRacks[] = {"/rack2", "/rack2"};
+ cluster.startDataNodes(conf, 2, true, null, newRacks);
+ REPLICATION_FACTOR = 5;
+ ns.setReplication("/testFile", REPLICATION_FACTOR);
+
+ DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /*
+ * Test that a block that is re-replicated because one of its replicas
+ * is found to be corrupt and is re-replicated across racks.
+ */
+ @Test
+ public void testCorruptBlockRereplicatedAcrossRacks() throws Exception {
+ Configuration conf = getConf();
+ short REPLICATION_FACTOR = 2;
+ int fileLen = 512;
+ final Path filePath = new Path("/testFile");
+ // Datanodes are spread across two racks
+ String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(racks.length).racks(racks).build();
+ final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
+ try {
+ // Create a file with one block with a replication factor of 2
+ final FileSystem fs = cluster.getFileSystem();
- //Add a new datanode on a different rack
- String newRacks[] = {"/rack2"} ;
- cluster.startDataNodes(conf, 1, true, null, newRacks);
+ DFSTestUtil.createFile(fs, filePath, fileLen, REPLICATION_FACTOR, 1L);
+ final String fileContent = DFSTestUtil.readFile(fs, filePath);
+
+ Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+ DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
- while ( (numRacks < 2) || (curReplicas != REPLICATION_FACTOR) ||
- (neededReplicationSize > 0) ) {
- LOG.info("Waiting for replication");
- Thread.sleep(600);
- numRacks = namesystem.blockManager.getNumberOfRacks(b);
- number = namesystem.blockManager.countNodes(b);
- curReplicas = number.liveReplicas();
- neededReplicationSize =
- namesystem.blockManager.neededReplications.size();
+ // Corrupt a replica of the block
+ int dnToCorrupt = DFSTestUtil.firstDnWithBlock(cluster, b);
+ assertTrue(cluster.corruptReplica(b.getBlockName(), dnToCorrupt));
+
+ // Restart the datanode so blocks are re-scanned, and the corrupt
+ // block is detected.
+ cluster.restartDataNode(dnToCorrupt);
+
+ // Wait for the namenode to notice the corrupt replica
+ DFSTestUtil.waitCorruptReplicas(fs, ns, filePath, b, 1);
+
+ // The rack policy is still respected
+ DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+ // Ensure all replicas are valid (the corrupt replica may not
+ // have been cleaned up yet).
+ for (int i = 0; i < racks.length; i++) {
+ String blockContent = cluster.readBlockOnDataNode(i, b.getBlockName());
+ if (blockContent != null && i != dnToCorrupt) {
+ assertEquals("Corrupt replica", fileContent, blockContent);
+ }
}
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /*
+ * Reduce the replication factor of a file, making sure that the only
+ * cross rack replica is not removed when deleting replicas.
+ */
+ @Test
+ public void testReduceReplFactorRespectsRackPolicy() throws Exception {
+ Configuration conf = getConf();
+ short REPLICATION_FACTOR = 3;
+ final Path filePath = new Path("/testFile");
+ String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(racks.length).racks(racks).build();
+ final FSNamesystem ns = cluster.getNameNode().getNamesystem();
- LOG.info("curReplicas = " + curReplicas);
- LOG.info("numRacks = " + numRacks);
- LOG.info("Size = " + namesystem.blockManager.neededReplications.size());
+ try {
+ // Create a file with one block
+ final FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+ Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+ DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+ // Decrease the replication factor, make sure the deleted replica
+ // was not the one that lived on the rack with only one replica,
+ // ie we should still have 2 racks after reducing the repl factor.
+ REPLICATION_FACTOR = 2;
+ ns.setReplication("/testFile", REPLICATION_FACTOR);
- assertEquals(2,numRacks);
- assertTrue(curReplicas == REPLICATION_FACTOR);
- assertEquals(0,namesystem.blockManager.neededReplications.size());
+ DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
} finally {
cluster.shutdown();
}
-
}
- public void testUnderReplicatedNotEnoughRacks() throws Exception {
- Configuration conf = new HdfsConfiguration();
- conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
- conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "xyz");
+ /*
+ * Test that when a block is replicated because a replica is lost due
+ * to host failure the the rack policy is preserved.
+ */
+ @Test
+ public void testReplDueToNodeFailRespectsRackPolicy() throws Exception {
+ Configuration conf = getConf();
short REPLICATION_FACTOR = 3;
- final String FILE_NAME = "/testFile";
- final Path FILE_PATH = new Path(FILE_NAME);
- //All datanodes are on the same rack
- String racks[] = {"/rack1","/rack1","/rack1",} ;
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR).racks(racks).build();
+ final Path filePath = new Path("/testFile");
+ // Last datanode is on a different rack
+ String racks[] = {"/rack1", "/rack1", "/rack1", "/rack2", "/rack2"};
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(racks.length).racks(racks).build();
+ final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
try {
- // create a file with one block with a replication factor of 3
+ // Create a file with one block with a replication factor of 2
final FileSystem fs = cluster.getFileSystem();
- DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
- DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
+ DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+ Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+ DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+ // Make the last datanode look like it failed to heartbeat by
+ // calling removeDatanode and stopping it.
+ ArrayList<DataNode> datanodes = cluster.getDataNodes();
+ int idx = datanodes.size() - 1;
+ DataNode dataNode = datanodes.get(idx);
+ cluster.stopDataNode(idx);
+ ns.removeDatanode(dataNode.dnRegistration);
+
+ // The block should still have sufficient # replicas, across racks.
+ // The last node may not have contained a replica, but if it did
+ // it should have been replicated within the same rack.
+ DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
- Block b = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
- final FSNamesystem namesystem = cluster.getNamesystem();
- int numRacks = namesystem.blockManager.getNumberOfRacks(b);
- NumberReplicas number = namesystem.blockManager.countNodes(b);
- int curReplicas = number.liveReplicas();
- int neededReplicationSize =
- namesystem.blockManager.neededReplications.size();
+ // Fail the last datanode again, it's also on rack2 so there is
+ // only 1 rack for all the replicas
+ datanodes = cluster.getDataNodes();
+ idx = datanodes.size() - 1;
+ dataNode = datanodes.get(idx);
+ cluster.stopDataNode(idx);
+ ns.removeDatanode(dataNode.dnRegistration);
+
+ // Make sure we have enough live replicas even though we are
+ // short one rack and therefore need one replica
+ DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /*
+ * Test that when the excess replicas of a block are reduced due to
+ * a node re-joining the cluster the rack policy is not violated.
+ */
+ @Test
+ public void testReduceReplFactorDueToRejoinRespectsRackPolicy()
+ throws Exception {
+ Configuration conf = getConf();
+ short REPLICATION_FACTOR = 2;
+ final Path filePath = new Path("/testFile");
+ // Last datanode is on a different rack
+ String racks[] = {"/rack1", "/rack1", "/rack2"};
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(racks.length).racks(racks).build();
+ final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
+ try {
+ // Create a file with one block
+ final FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+ Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+ DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+ // Make the last (cross rack) datanode look like it failed
+ // to heartbeat by stopping it and calling removeDatanode.
+ ArrayList<DataNode> datanodes = cluster.getDataNodes();
+ assertEquals(3, datanodes.size());
+ DataNode dataNode = datanodes.get(2);
+ cluster.stopDataNode(2);
+ ns.removeDatanode(dataNode.dnRegistration);
+
+ // The block gets re-replicated to another datanode so it has a
+ // sufficient # replicas, but not across racks, so there should
+ // be 1 rack, and 1 needed replica (even though there are 2 hosts
+ // available and only 2 replicas required).
+ DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1);
+
+ // Start the "failed" datanode, which has a replica so the block is
+ // now over-replicated and therefore a replica should be removed but
+ // not on the restarted datanode as that would violate the rack policy.
+ String rack2[] = {"/rack2"};
+ cluster.startDataNodes(conf, 1, true, null, rack2);
+ cluster.waitActive();
- //Add a new datanode on a different rack
- String newRacks[] = {"/rack2","/rack2","/rack2"} ;
- cluster.startDataNodes(conf, 3, true, null, newRacks);
- REPLICATION_FACTOR = 5;
- namesystem.setReplication(FILE_NAME, REPLICATION_FACTOR);
+ // The block now has sufficient # replicas, across racks
+ DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+ } finally {
+ cluster.shutdown();
+ }
+ }
- while ( (numRacks < 2) || (curReplicas < REPLICATION_FACTOR) ||
- (neededReplicationSize > 0) ) {
- LOG.info("Waiting for replication");
- Thread.sleep(600);
- numRacks = namesystem.blockManager.getNumberOfRacks(b);
- number = namesystem.blockManager.countNodes(b);
- curReplicas = number.liveReplicas();
- neededReplicationSize =
- namesystem.blockManager.neededReplications.size();
- }
+ /*
+ * Test that rack policy is still respected when blocks are replicated
+ * due to node decommissioning.
+ */
+ @Test
+ public void testNodeDecomissionRespectsRackPolicy() throws Exception {
+ Configuration conf = getConf();
+ short REPLICATION_FACTOR = 2;
+ final Path filePath = new Path("/testFile");
+
+ // Configure an excludes file
+ FileSystem localFileSys = FileSystem.getLocal(conf);
+ Path workingDir = localFileSys.getWorkingDirectory();
+ Path dir = new Path(workingDir, "build/test/data/temp/decommission");
+ Path excludeFile = new Path(dir, "exclude");
+ assertTrue(localFileSys.mkdirs(dir));
+ DFSTestUtil.writeFile(localFileSys, excludeFile, "");
+ conf.set("dfs.hosts.exclude", excludeFile.toUri().getPath());
+
+ // Two blocks and four racks
+ String racks[] = {"/rack1", "/rack1", "/rack2", "/rack2"};
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(racks.length).racks(racks).build();
+ final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
+ try {
+ // Create a file with one block
+ final FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+ Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+ DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+ // Decommission one of the hosts with the block, this should cause
+ // the block to get replicated to another host on the same rack,
+ // otherwise the rack policy is violated.
+ BlockLocation locs[] = fs.getFileBlockLocations(
+ fs.getFileStatus(filePath), 0, Long.MAX_VALUE);
+ String name = locs[0].getNames()[0];
+ DFSTestUtil.writeFile(localFileSys, excludeFile, name);
+ ns.refreshNodes(conf);
+ DFSTestUtil.waitForDecommission(fs, name);
- LOG.info("curReplicas = " + curReplicas);
- LOG.info("numRacks = " + numRacks);
- LOG.info("Size = " + namesystem.blockManager.neededReplications.size());
+ // Check the block still has sufficient # replicas across racks
+ DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /*
+ * Test that rack policy is still respected when blocks are replicated
+ * due to node decommissioning, when the blocks are over-replicated.
+ */
+ @Test
+ public void testNodeDecomissionWithOverreplicationRespectsRackPolicy()
+ throws Exception {
+ Configuration conf = getConf();
+ short REPLICATION_FACTOR = 5;
+ final Path filePath = new Path("/testFile");
+
+ // Configure an excludes file
+ FileSystem localFileSys = FileSystem.getLocal(conf);
+ Path workingDir = localFileSys.getWorkingDirectory();
+ Path dir = new Path(workingDir, "build/test/data/temp/decommission");
+ Path excludeFile = new Path(dir, "exclude");
+ assertTrue(localFileSys.mkdirs(dir));
+ DFSTestUtil.writeFile(localFileSys, excludeFile, "");
+ conf.set("dfs.hosts.exclude", excludeFile.toUri().getPath());
+
+ // All hosts are on two racks, only one host on /rack2
+ String racks[] = {"/rack1", "/rack2", "/rack1", "/rack1", "/rack1"};
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(racks.length).racks(racks).build();
+ final FSNamesystem ns = cluster.getNameNode().getNamesystem();
+
+ try {
+ final FileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L);
+ Block b = DFSTestUtil.getFirstBlock(fs, filePath);
+ DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
+
+ // Lower the replication factor so the blocks are over replicated
+ REPLICATION_FACTOR = 2;
+ fs.setReplication(filePath, REPLICATION_FACTOR);
+
+ // Decommission one of the hosts with the block that is not on
+ // the lone host on rack2 (if we decomission that host it would
+ // be impossible to respect the rack policy).
+ BlockLocation locs[] = fs.getFileBlockLocations(
+ fs.getFileStatus(filePath), 0, Long.MAX_VALUE);
+ for (String top : locs[0].getTopologyPaths()) {
+ if (!top.startsWith("/rack2")) {
+ String name = top.substring("/rack1".length()+1);
+ DFSTestUtil.writeFile(localFileSys, excludeFile, name);
+ ns.refreshNodes(conf);
+ DFSTestUtil.waitForDecommission(fs, name);
+ break;
+ }
+ }
- assertEquals(2,numRacks);
- assertTrue(curReplicas == REPLICATION_FACTOR);
- assertEquals(0,namesystem.blockManager.neededReplications.size());
+ // Check the block still has sufficient # replicas across racks,
+ // ie we didn't remove the replica on the host on /rack1.
+ DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
} finally {
cluster.shutdown();
}
-
}
-}
+}
\ No newline at end of file
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestCorruptFilesJsp.java Fri Apr 22 17:10:43 2011
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
import java.net.URL;
import java.util.Collection;
@@ -31,7 +32,6 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
import org.junit.Test;
/** A JUnit test for corrupt_files.jsp */
@@ -82,7 +82,7 @@ public class TestCorruptFilesJsp {
for (int idx = 0; idx < filepaths.length - 1; idx++) {
String blockName = DFSTestUtil.getFirstBlock(fs, filepaths[idx])
.getBlockName();
- TestDatanodeBlockScanner.corruptReplica(blockName, 0);
+ assertTrue(cluster.corruptReplica(blockName, 0));
// read the file so that the corrupt block is reported to NN
FSDataInputStream in = fs.open(filepaths[idx]);
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java?rev=1095963&r1=1095962&r2=1095963&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java Fri Apr 22 17:10:43 2011
@@ -54,7 +54,7 @@ public class TestOverReplicatedBlocks ex
// corrupt the block on datanode 0
Block block = DFSTestUtil.getFirstBlock(fs, fileName);
- TestDatanodeBlockScanner.corruptReplica(block.getBlockName(), 0);
+ assertTrue(cluster.corruptReplica(block.getBlockName(), 0));
DataNodeProperties dnProps = cluster.stopDataNode(0);
// remove block scanner log to trigger block scanning
File scanLog = new File(System.getProperty("test.build.data"),