You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/04/29 05:03:27 UTC
svn commit: r1097671 [2/3] - in /hadoop/hdfs/branches/HDFS-1073: ./ bin/
src/c++/libhdfs/ src/contrib/ src/contrib/hdfsproxy/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/co...
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java Fri Apr 29 03:03:25 2011
@@ -461,7 +461,9 @@ abstract class INode implements Comparab
long nsQuota,
long dsQuota,
long preferredBlockSize) {
- if (blocks == null) {
+ if (symlink.length() != 0) { // check if symbolic link
+ return new INodeSymlink(symlink, modificationTime, atime, permissions);
+ } else if (blocks == null) { //not sym link and blocks null? directory!
if (nsQuota >= 0 || dsQuota >= 0) {
return new INodeDirectoryWithQuota(
permissions, modificationTime, nsQuota, dsQuota);
@@ -469,10 +471,6 @@ abstract class INode implements Comparab
// regular directory
return new INodeDirectory(permissions, modificationTime);
}
- // check if symbolic link
- if (symlink.length() != 0) {
- return new INodeSymlink(symlink, modificationTime, atime, permissions);
- }
// file
return new INodeFile(permissions, blocks, replication,
modificationTime, atime, preferredBlockSize);
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/UnderReplicatedBlocks.java Fri Apr 29 03:03:25 2011
@@ -86,9 +86,8 @@ class UnderReplicatedBlocks implements I
int curReplicas,
int decommissionedReplicas,
int expectedReplicas) {
- if (curReplicas<0) {
- return LEVEL;
- } else if (curReplicas>=expectedReplicas) {
+ assert curReplicas >= 0 : "Negative replicas!";
+ if (curReplicas >= expectedReplicas) {
return 3; // Block doesn't have enough racks
} else if(curReplicas==0) {
// If there are zero non-decommissioned replica but there are
@@ -116,9 +115,7 @@ class UnderReplicatedBlocks implements I
int curReplicas,
int decomissionedReplicas,
int expectedReplicas) {
- if(curReplicas<0) {
- return false;
- }
+ assert curReplicas >= 0 : "Negative replicas!";
int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
expectedReplicas);
if(priLevel != LEVEL && priorityQueues.get(priLevel).add(block)) {
@@ -159,8 +156,10 @@ class UnderReplicatedBlocks implements I
}
return true;
} else {
+ // Try to remove the block from all queues if the block was
+ // not found in the queue for the given priority level.
for(int i=0; i<LEVEL; i++) {
- if(i!=priLevel && priorityQueues.get(i).remove(block)) {
+ if(priorityQueues.get(i).remove(block)) {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.remove: "
@@ -202,7 +201,7 @@ class UnderReplicatedBlocks implements I
"BLOCK* NameSystem.UnderReplicationBlock.update:"
+ block
+ " has only "+curReplicas
- + " replicas and need " + curExpectedReplicas
+ + " replicas and needs " + curExpectedReplicas
+ " replicas so is added to neededReplications"
+ " at priority level " + curPri);
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java Fri Apr 29 03:03:25 2011
@@ -37,8 +37,9 @@ import static org.apache.hadoop.hdfs.too
@InterfaceStability.Unstable
class EditsLoaderCurrent implements EditsLoader {
- private static int [] supportedVersions = {
- -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -30, -31 };
+ private static int[] supportedVersions = {
+ -18, -19, -20, -21, -22, -23, -24,
+ -25, -26, -27, -28, -30, -31, -32, -33, -34 };
private EditsVisitor v;
private int editsVersion = 0;
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java Fri Apr 29 03:03:25 2011
@@ -119,8 +119,8 @@ import org.apache.hadoop.security.token.
class ImageLoaderCurrent implements ImageLoader {
protected final DateFormat dateFormat =
new SimpleDateFormat("yyyy-MM-dd HH:mm");
- private static int [] versions =
- {-16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -30, -31};
+ private static int[] versions = { -16, -17, -18, -19, -20, -21, -22, -23,
+ -24, -25, -26, -27, -28, -30, -31, -32, -33, -34 };
private int imageVersion = 0;
/* (non-Javadoc)
Propchange: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Apr 29 03:03:25 2011
@@ -2,4 +2,4 @@
/hadoop/core/trunk/src/test/hdfs:776175-785643
/hadoop/hdfs/branches/HDFS-265/src/test/hdfs:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/test/hdfs:820487
-/hadoop/hdfs/trunk/src/test/hdfs:1086482-1095244
+/hadoop/hdfs/trunk/src/test/hdfs:1086482-1097628
Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Fri Apr 29 03:03:25 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -27,6 +28,8 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URL;
import java.net.URLConnection;
@@ -36,6 +39,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@@ -50,15 +54,19 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.junit.Assert;
+
+import static org.junit.Assert.*;
/** Utilities for HDFS tests */
public class DFSTestUtil {
@@ -143,6 +151,12 @@ public class DFSTestUtil {
replicationFactor, files[idx].getSeed());
}
}
+
+ public static String readFile(FileSystem fs, Path fileName) throws IOException {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ IOUtils.copyBytes(fs.open(fileName), os, 1024, true);
+ return os.toString();
+ }
public static void createFile(FileSystem fs, Path fileName, long fileLen,
short replFactor, long seed) throws IOException {
@@ -172,8 +186,6 @@ public class DFSTestUtil {
/** check if the files have been copied correctly. */
public boolean checkFiles(FileSystem fs, String topdir) throws IOException {
-
- //Configuration conf = new HdfsConfiguration();
Path root = new Path(topdir);
for (int idx = 0; idx < nFiles; idx++) {
@@ -206,9 +218,10 @@ public class DFSTestUtil {
}
}
- // waits for the replication factor of all files to reach the
- // specified target
- //
+ /*
+ * Waits for the replication factor of all files to reach the
+ * specified target.
+ */
public void waitReplication(FileSystem fs, String topdir, short value)
throws IOException {
Path root = new Path(topdir);
@@ -219,6 +232,128 @@ public class DFSTestUtil {
}
}
+ /*
+ * Check if the given block in the given file is corrupt.
+ */
+ public static boolean allBlockReplicasCorrupt(MiniDFSCluster cluster,
+ Path file, int blockNo) throws IOException {
+ DFSClient client = new DFSClient(new InetSocketAddress("localhost",
+ cluster.getNameNodePort()), cluster.getConfiguration());
+ LocatedBlocks blocks;
+ try {
+ blocks = client.getNamenode().getBlockLocations(
+ file.toString(), 0, Long.MAX_VALUE);
+ } finally {
+ client.close();
+ }
+ return blocks.get(blockNo).isCorrupt();
+ }
+
+ /*
+ * Wait up to 20s for the given block to be replicated across
+ * the requested number of racks, with the requested number of
+ * replicas, and the requested number of replicas still needed.
+ */
+ public static void waitForReplication(MiniDFSCluster cluster, Block b,
+ int racks, int replicas, int neededReplicas)
+ throws IOException, TimeoutException, InterruptedException {
+ int curRacks = 0;
+ int curReplicas = 0;
+ int curNeededReplicas = 0;
+ int count = 0;
+ final int ATTEMPTS = 20;
+
+ do {
+ Thread.sleep(1000);
+ int []r = NameNodeAdapter.getReplicaInfo(cluster.getNameNode(), b);
+ curRacks = r[0];
+ curReplicas = r[1];
+ curNeededReplicas = r[2];
+ count++;
+ } while ((curRacks != racks ||
+ curReplicas != replicas ||
+ curNeededReplicas != neededReplicas) && count < ATTEMPTS);
+
+ if (count == ATTEMPTS) {
+ throw new TimeoutException("Timed out waiting for replication."
+ + " Needed replicas = "+neededReplicas
+ + " Cur needed replicas = "+curNeededReplicas
+ + " Replicas = "+replicas+" Cur replicas = "+curReplicas
+ + " Racks = "+racks+" Cur racks = "+curRacks);
+ }
+ }
+
+ /*
+ * Keep accessing the given file until the namenode reports that the
+ * given block in the file contains the given number of corrupt replicas.
+ */
+ public static void waitCorruptReplicas(FileSystem fs, FSNamesystem ns,
+ Path file, Block b, int corruptRepls)
+ throws IOException, TimeoutException {
+ int count = 0;
+ final int ATTEMPTS = 20;
+ int repls = ns.numCorruptReplicas(b);
+ while (repls != corruptRepls && count < ATTEMPTS) {
+ try {
+ IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(),
+ 512, true);
+ } catch (IOException e) {
+ // Swallow exceptions
+ }
+ System.out.println("Waiting for "+corruptRepls+" corrupt replicas");
+ repls = ns.numCorruptReplicas(b);
+ count++;
+ }
+ if (count == ATTEMPTS) {
+ throw new TimeoutException("Timed out waiting for corrupt replicas."
+ + " Waiting for "+corruptRepls+", but only found "+repls);
+ }
+ }
+
+ /*
+ * Wait up to 20s for the given DN (host:port) to be decommissioned.
+ */
+ public static void waitForDecommission(FileSystem fs, String name)
+ throws IOException, InterruptedException, TimeoutException {
+ DatanodeInfo dn = null;
+ int count = 0;
+ final int ATTEMPTS = 20;
+
+ do {
+ Thread.sleep(1000);
+ DistributedFileSystem dfs = (DistributedFileSystem)fs;
+ for (DatanodeInfo info : dfs.getDataNodeStats()) {
+ if (name.equals(info.getName())) {
+ dn = info;
+ }
+ }
+ count++;
+ } while ((dn == null ||
+ dn.isDecommissionInProgress() ||
+ !dn.isDecommissioned()) && count < ATTEMPTS);
+
+ if (count == ATTEMPTS) {
+ throw new TimeoutException("Timed out waiting for datanode "
+ + name + " to decommission.");
+ }
+ }
+
+ /*
+ * Returns the index of the first datanode which has a copy
+ * of the given block, or -1 if no such datanode exists.
+ */
+ public static int firstDnWithBlock(MiniDFSCluster cluster, Block b)
+ throws IOException {
+ int numDatanodes = cluster.getDataNodes().size();
+ for (int i = 0; i < numDatanodes; i++) {
+ String blockContent = cluster.readBlockOnDataNode(i, b.getBlockName());
+ if (blockContent != null) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
/** return list of filenames created as part of createFiles */
public String[] getFileNames(String topDir) {
if (nFiles == 0)
@@ -241,10 +376,13 @@ public class DFSTestUtil {
BlockLocation locs[] = fs.getFileBlockLocations(
fs.getFileStatus(fileName), 0, Long.MAX_VALUE);
for (int j = 0; j < locs.length; j++) {
- String[] loc = locs[j].getHosts();
- if (loc.length != replFactor) {
- System.out.println("File " + fileName + " has replication factor " +
- loc.length);
+ String[] hostnames = locs[j].getNames();
+ if (hostnames.length != replFactor) {
+ String hostNameList = "";
+ for (String h : hostnames) hostNameList += h + " ";
+ System.out.println("Block " + j + " of file " + fileName
+ + " has replication factor " + hostnames.length + "; locations "
+ + hostNameList);
good = false;
try {
System.out.println("Waiting for replication factor to drain");
@@ -253,6 +391,10 @@ public class DFSTestUtil {
break;
}
}
+ if (good) {
+ System.out.println("All blocks of file " + fileName
+ + " verified to have replication factor " + replFactor);
+ }
} while(!good);
}
@@ -285,7 +427,7 @@ public class DFSTestUtil {
).getLogger().setLevel(org.apache.log4j.Level.ALL);
}
- static String readFile(File f) throws IOException {
+ public static String readFile(File f) throws IOException {
StringBuilder b = new StringBuilder();
BufferedReader in = new BufferedReader(new FileReader(f));
for(int c; (c = in.read()) != -1; b.append((char)c));
@@ -293,6 +435,17 @@ public class DFSTestUtil {
return b.toString();
}
+ /* Write the given string to the given file */
+ public static void writeFile(FileSystem fs, Path p, String s)
+ throws IOException {
+ if (fs.exists(p)) {
+ fs.delete(p, true);
+ }
+ InputStream is = new ByteArrayInputStream(s.getBytes());
+ FSDataOutputStream os = fs.create(p);
+ IOUtils.copyBytes(is, os, s.length(), true);
+ }
+
// Returns url content as string.
public static String urlGet(URL url) throws IOException {
URLConnection conn = url.openConnection();
@@ -402,7 +555,7 @@ public class DFSTestUtil {
/** For {@link TestTransferRbw} */
public static DataTransferProtocol.Status transferRbw(final Block b,
final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
- Assert.assertEquals(2, datanodes.length);
+ assertEquals(2, datanodes.length);
final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0],
datanodes.length, dfsClient);
final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Fri Apr 29 03:03:25 2011
@@ -878,29 +878,69 @@ public class MiniDFSCluster {
System.out.println("Cluster is active");
}
- /*
- * Corrupt a block on all datanode
+ /**
+ * Return the contents of the given block on the given datanode.
+ *
+ * @param The index of the datanode
+ * @param The name of the block
+ * @throws IOException on error accessing the file for the given block
+ * @return The contents of the block file, null if none found
*/
- void corruptBlockOnDataNodes(String blockName) throws Exception{
- for (int i=0; i < dataNodes.size(); i++)
- corruptBlockOnDataNode(i,blockName);
+ public String readBlockOnDataNode(int i, String blockName) throws IOException {
+ assert (i >= 0 && i < dataNodes.size()) : "Invalid datanode "+i;
+
+ // Each datanode has multiple data dirs, check each
+ for (int dn = i*2; dn < i*2+2; dn++) {
+ File dataDir = new File(getBaseDirectory() + "data");
+ File blockFile = new File(dataDir,
+ "data" + (dn+1) + FINALIZED_DIR_NAME + blockName);
+ if (blockFile.exists()) {
+ return DFSTestUtil.readFile(blockFile);
+ }
+ }
+ return null;
}
- /*
- * Corrupt a block on a particular datanode
+ /**
+ * Corrupt a block on all datanodes.
+ *
+ * @param The name of the block
+ * @throws IOException on error accessing the given block.
+ * @return The number of block files corrupted.
+ */
+ public int corruptBlockOnDataNodes(String blockName) throws IOException {
+ int blocksCorrupted = 0;
+ for (int i=0; i < dataNodes.size(); i++) {
+ if (corruptReplica(blockName, i)) {
+ blocksCorrupted++;
+ }
+ }
+ return blocksCorrupted;
+ }
+
+ /**
+ * Corrupt a block on a particular datanode.
+ *
+ * @param The index of the datanode
+ * @param The name of the block
+ * @throws IOException on error accessing the given block or if
+ * the contents of the block (on the same datanode) differ.
+ * @return true if a replica was corrupted, false otherwise
*/
- boolean corruptBlockOnDataNode(int i, String blockName) throws Exception {
+ public boolean corruptReplica(String blockName, int i) throws IOException {
Random random = new Random();
- boolean corrupted = false;
- File dataDir = new File(getBaseDirectory() + "data");
- if (i < 0 || i >= dataNodes.size())
- return false;
+ assert (i >= 0 && i < dataNodes.size()) : "Invalid datanode "+i;
+ int filesCorrupted = 0;
+
+ // Each datanode has multiple data dirs, check each
for (int dn = i*2; dn < i*2+2; dn++) {
- File blockFile = new File(dataDir, "data" + (dn+1) + FINALIZED_DIR_NAME +
- blockName);
- System.out.println("Corrupting for: " + blockFile);
- if (blockFile.exists()) {
- // Corrupt replica by writing random bytes into replica
+ File dataDir = new File(getBaseDirectory() + "data");
+ File blockFile = new File(dataDir,
+ "data" + (dn+1) + FINALIZED_DIR_NAME + blockName);
+
+ // Corrupt the replica by writing some bytes into a random offset
+ if (blockFile.exists()) {
+ System.out.println("Corrupting " + blockFile);
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
FileChannel channel = raFile.getChannel();
String badString = "BADBAD";
@@ -908,10 +948,12 @@ public class MiniDFSCluster {
raFile.seek(rand);
raFile.write(badString.getBytes());
raFile.close();
+ filesCorrupted++;
}
- corrupted = true;
}
- return corrupted;
+ assert filesCorrupted == 0 || filesCorrupted == 1
+ : "Unexpected # block files";
+ return filesCorrupted == 1;
}
/*
Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestCrcCorruption.java Fri Apr 29 03:03:25 2011
@@ -244,7 +244,7 @@ public class TestCrcCorruption {
private void doTestEntirelyCorruptFile(int numDataNodes) throws Exception {
long fileSize = 4096;
Path file = new Path("/testFile");
-
+ short replFactor = (short)numDataNodes;
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
@@ -253,11 +253,12 @@ public class TestCrcCorruption {
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
- DFSTestUtil.createFile(fs, file, fileSize, (short)numDataNodes, 12345L /*seed*/);
- DFSTestUtil.waitReplication(fs, file, (short)numDataNodes);
+ DFSTestUtil.createFile(fs, file, fileSize, replFactor, 12345L /*seed*/);
+ DFSTestUtil.waitReplication(fs, file, replFactor);
String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
- cluster.corruptBlockOnDataNodes(block);
+ int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
+ assertEquals("All replicas not corrupted", replFactor, blockFilesCorrupted);
try {
IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(), conf,
Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Fri Apr 29 03:03:25 2011
@@ -380,7 +380,7 @@ public class TestDFSClientRetries extend
int bufferSize = 4096;
Configuration conf = new HdfsConfiguration();
- conf.setInt(DFSConfigKeys.DFS_DATANODE_MAX_XCIEVERS_KEY,xcievers);
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY, xcievers);
conf.setInt(DFSConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY,
retries);
conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWin);
Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSUpgrade.java Fri Apr 29 03:03:25 2011
@@ -29,17 +29,20 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
+import org.junit.Test;
+import static org.junit.Assert.*;
/**
* This test ensures the appropriate response (successful or failure) from
* the system when the system is upgraded under various storage state and
* version conditions.
*/
-public class TestDFSUpgrade extends TestCase {
+public class TestDFSUpgrade {
private static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.hdfs.TestDFSUpgrade");
@@ -141,6 +144,7 @@ public class TestDFSUpgrade extends Test
* This test attempts to upgrade the NameNode and DataNode under
* a number of valid and invalid conditions.
*/
+ @Test
public void testUpgrade() throws Exception {
File[] baseDirs;
UpgradeUtilities.initialize();
@@ -256,15 +260,23 @@ public class TestDFSUpgrade extends Test
} // end numDir loop
}
- protected void tearDown() throws Exception {
- LOG.info("Shutting down MiniDFSCluster");
- if (cluster != null) cluster.shutdown();
+ @Test(expected=IOException.class)
+ public void testUpgradeFromPreUpgradeLVFails() throws IOException {
+ // Upgrade from versions prior to Storage#LAST_UPGRADABLE_LAYOUT_VERSION
+ // is not allowed
+ Storage.checkVersionUpgradable(Storage.LAST_PRE_UPGRADE_LAYOUT_VERSION + 1);
+ fail("Expected IOException is not thrown");
}
-
+
+ public void test203LayoutVersion() {
+ for (int lv : Storage.LAYOUT_VERSIONS_203) {
+ assertTrue(Storage.is203LayoutVersion(lv));
+ }
+ }
+
public static void main(String[] args) throws Exception {
new TestDFSUpgrade().testUpgrade();
}
-
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java Fri Apr 29 03:03:25 2011
@@ -21,10 +21,10 @@ package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URL;
+import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.io.*;
-import java.nio.channels.FileChannel;
import java.util.Random;
import org.apache.commons.logging.Log;
@@ -32,7 +32,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -48,6 +47,8 @@ public class TestDatanodeBlockScanner ex
private static final Log LOG =
LogFactory.getLog(TestDatanodeBlockScanner.class);
+ private static final long TIMEOUT = 20000; // 20 sec.
+
private static Pattern pattern =
Pattern.compile(".*?(blk_[-]*\\d+).*?scan time\\s*:\\s*(\\d+)");
@@ -55,18 +56,35 @@ public class TestDatanodeBlockScanner ex
Pattern.compile(".*?(SCAN_PERIOD)\\s*:\\s*(\\d+.*?)");
/**
* This connects to datanode and fetches block verification data.
- * It repeats this until the given block has a verification time > 0.
+ * It repeats this until the given block has a verification time > newTime.
+ * @param newTime - validation timestamps before newTime are "old", the
+ * result of previous validations. This method waits until a "new"
+ * validation timestamp is obtained. If no validator runs soon
+ * enough, the method will time out.
+ * @return - the new validation timestamp
+ * @throws IOException
+ * @throws TimeoutException
*/
private static long waitForVerification(DatanodeInfo dn, FileSystem fs,
- Path file, int blocksValidated) throws IOException {
+ Path file, int blocksValidated,
+ long newTime, long timeout)
+ throws IOException, TimeoutException {
URL url = new URL("http://localhost:" + dn.getInfoPort() +
"/blockScannerReport?listblocks");
long lastWarnTime = System.currentTimeMillis();
+ if (newTime <= 0) newTime = 1L;
long verificationTime = 0;
String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
-
- while (verificationTime <= 0) {
+ long failtime = (timeout <= 0) ? Long.MAX_VALUE
+ : System.currentTimeMillis() + timeout;
+ while (verificationTime < newTime) {
+ if (failtime < System.currentTimeMillis()) {
+ throw new TimeoutException("failed to achieve block verification after "
+ + timeout + " msec. Current verification timestamp = "
+ + verificationTime + ", requested verification time > "
+ + newTime);
+ }
String response = DFSTestUtil.urlGet(url);
if(blocksValidated >= 0) {
for(Matcher matcher = pattern_blockVerify.matcher(response); matcher.find();) {
@@ -83,7 +101,7 @@ public class TestDatanodeBlockScanner ex
}
}
- if (verificationTime <= 0) {
+ if (verificationTime < newTime) {
long now = System.currentTimeMillis();
if ((now - lastWarnTime) >= 5*1000) {
LOG.info("Waiting for verification of " + block);
@@ -98,8 +116,7 @@ public class TestDatanodeBlockScanner ex
return verificationTime;
}
- public void testDatanodeBlockScanner() throws IOException {
-
+ public void testDatanodeBlockScanner() throws IOException, TimeoutException {
long startTime = System.currentTimeMillis();
Configuration conf = new HdfsConfiguration();
@@ -115,6 +132,7 @@ public class TestDatanodeBlockScanner ex
*/
DFSTestUtil.createFile(fs, file1, 10, (short)1, 0);
cluster.shutdown();
+
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.format(false).build();
@@ -128,7 +146,7 @@ public class TestDatanodeBlockScanner ex
/*
* The cluster restarted. The block should be verified by now.
*/
- assertTrue(waitForVerification(dn, fs, file1, 1) > startTime);
+ assertTrue(waitForVerification(dn, fs, file1, 1, startTime, TIMEOUT) >= startTime);
/*
* Create a new file and read the block. The block should be marked
@@ -137,41 +155,16 @@ public class TestDatanodeBlockScanner ex
DFSTestUtil.createFile(fs, file2, 10, (short)1, 0);
IOUtils.copyBytes(fs.open(file2), new IOUtils.NullOutputStream(),
conf, true);
- assertTrue(waitForVerification(dn, fs, file2, 2) > startTime);
+ assertTrue(waitForVerification(dn, fs, file2, 2, startTime, TIMEOUT) >= startTime);
cluster.shutdown();
}
- public static boolean corruptReplica(String blockName, int replica) throws IOException {
- Random random = new Random();
- File baseDir = new File(MiniDFSCluster.getBaseDirectory(), "data");
- boolean corrupted = false;
- for (int i=replica*2; i<replica*2+2; i++) {
- File blockFile = new File(baseDir, "data" + (i+1) +
- MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
- if (blockFile.exists()) {
- // Corrupt replica by writing random bytes into replica
- RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
- FileChannel channel = raFile.getChannel();
- String badString = "BADBAD";
- int rand = random.nextInt((int)channel.size()/2);
- raFile.seek(rand);
- raFile.write(badString.getBytes());
- raFile.close();
- corrupted = true;
- }
- }
- return corrupted;
- }
-
- public void testBlockCorruptionPolicy() throws IOException {
+ public void testBlockCorruptionPolicy() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
Random random = new Random();
FileSystem fs = null;
- DFSClient dfsClient = null;
- LocatedBlocks blocks = null;
- int blockCount = 0;
int rand = random.nextInt(3);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
@@ -181,44 +174,24 @@ public class TestDatanodeBlockScanner ex
DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0);
String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
- dfsClient = new DFSClient(new InetSocketAddress("localhost",
- cluster.getNameNodePort()), conf);
- do {
- blocks = dfsClient.getNamenode().
- getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
- blockCount = blocks.get(0).getLocations().length;
- try {
- LOG.info("Looping until expected blockCount of 3 is received: " + blockCount);
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- } while (blockCount != 3);
- assertTrue(blocks.get(0).isCorrupt() == false);
+ DFSTestUtil.waitReplication(fs, file1, (short)3);
+ assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
// Corrupt random replica of block
- assertTrue(corruptReplica(block, rand));
+ assertTrue(cluster.corruptReplica(block, rand));
// Restart the datanode hoping the corrupt block to be reported
cluster.restartDataNode(rand);
// We have 2 good replicas and block is not corrupt
- do {
- blocks = dfsClient.getNamenode().
- getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
- blockCount = blocks.get(0).getLocations().length;
- try {
- LOG.info("Looping until expected blockCount of 2 is received: " + blockCount);
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- } while (blockCount != 2);
- assertTrue(blocks.get(0).isCorrupt() == false);
+ DFSTestUtil.waitReplication(fs, file1, (short)2);
+ assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
// Corrupt all replicas. Now, block should be marked as corrupt
// and we should get all the replicas
- assertTrue(corruptReplica(block, 0));
- assertTrue(corruptReplica(block, 1));
- assertTrue(corruptReplica(block, 2));
+ assertTrue(cluster.corruptReplica(block, 0));
+ assertTrue(cluster.corruptReplica(block, 1));
+ assertTrue(cluster.corruptReplica(block, 2));
// Read the file to trigger reportBadBlocks by client
try {
@@ -230,18 +203,8 @@ public class TestDatanodeBlockScanner ex
// We now have the blocks to be marked as corrupt and we get back all
// its replicas
- do {
- blocks = dfsClient.getNamenode().
- getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
- blockCount = blocks.get(0).getLocations().length;
- try {
- LOG.info("Looping until expected blockCount of 3 is received");
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- } while (blockCount != 3);
- assertTrue(blocks.get(0).isCorrupt() == true);
-
+ DFSTestUtil.waitReplication(fs, file1, (short)3);
+ assertTrue(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
cluster.shutdown();
}
@@ -261,11 +224,13 @@ public class TestDatanodeBlockScanner ex
* 4. Test again waits until the block is reported with expected number
* of good replicas.
*/
- public void testBlockCorruptionRecoveryPolicy() throws IOException {
+ public void testBlockCorruptionRecoveryPolicy1() throws Exception {
// Test recovery of 1 corrupt replica
LOG.info("Testing corrupt replica recovery for one corrupt replica");
blockCorruptionRecoveryPolicy(4, (short)3, 1);
+ }
+ public void testBlockCorruptionRecoveryPolicy2() throws Exception {
// Test recovery of 2 corrupt replicas
LOG.info("Testing corrupt replica recovery for two corrupt replicas");
blockCorruptionRecoveryPolicy(5, (short)3, 2);
@@ -274,111 +239,58 @@ public class TestDatanodeBlockScanner ex
private void blockCorruptionRecoveryPolicy(int numDataNodes,
short numReplicas,
int numCorruptReplicas)
- throws IOException {
+ throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 30L);
- conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 30);
- conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30L);
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 3);
+ conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3L);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
- FileSystem fs = null;
- DFSClient dfsClient = null;
- LocatedBlocks blocks = null;
- int replicaCount = 0;
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
cluster.waitActive();
- fs = cluster.getFileSystem();
+ FileSystem fs = cluster.getFileSystem();
Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
Block blk = DFSTestUtil.getFirstBlock(fs, file1);
String block = blk.getBlockName();
-
- dfsClient = new DFSClient(new InetSocketAddress("localhost",
- cluster.getNameNodePort()), conf);
- blocks = dfsClient.getNamenode().
- getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
- replicaCount = blocks.get(0).getLocations().length;
// Wait until block is replicated to numReplicas
- while (replicaCount != numReplicas) {
- try {
- LOG.info("Looping until expected replicaCount of " + numReplicas +
- "is reached");
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- blocks = dfsClient.getNamenode().
- getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
- replicaCount = blocks.get(0).getLocations().length;
- }
- assertTrue(blocks.get(0).isCorrupt() == false);
+ DFSTestUtil.waitReplication(fs, file1, numReplicas);
// Corrupt numCorruptReplicas replicas of block
int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
- if (corruptReplica(block, i))
+ if (cluster.corruptReplica(block, i)) {
corruptReplicasDNIDs[j++] = i;
+ LOG.info("successfully corrupted block " + block + " on node "
+ + i + " " + cluster.getDataNodes().get(i).getSelfAddr());
+ }
}
// Restart the datanodes containing corrupt replicas
// so they would be reported to namenode and re-replicated
- for (int i =0; i < numCorruptReplicas; i++)
- cluster.restartDataNode(corruptReplicasDNIDs[i]);
+ // They MUST be restarted in reverse order from highest to lowest index,
+ // because the act of restarting them removes them from the ArrayList
+ // and causes the indexes of all nodes above them in the list to change.
+ for (int i = numCorruptReplicas - 1; i >= 0 ; i--) {
+ LOG.info("restarting node with corrupt replica: position "
+ + i + " node " + corruptReplicasDNIDs[i] + " "
+ + cluster.getDataNodes().get(corruptReplicasDNIDs[i]).getSelfAddr());
+ cluster.restartDataNode(corruptReplicasDNIDs[i]);
+ }
// Loop until all corrupt replicas are reported
- int corruptReplicaSize = cluster.getNamesystem().
- numCorruptReplicas(blk);
- while (corruptReplicaSize != numCorruptReplicas) {
- try {
- IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(),
- conf, true);
- } catch (IOException e) {
- }
- try {
- LOG.info("Looping until expected " + numCorruptReplicas + " are " +
- "reported. Current reported " + corruptReplicaSize);
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- corruptReplicaSize = cluster.getNamesystem().
- numCorruptReplicas(blk);
- }
+ DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1,
+ blk, numCorruptReplicas);
// Loop until the block recovers after replication
- blocks = dfsClient.getNamenode().
- getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
- replicaCount = blocks.get(0).getLocations().length;
- while (replicaCount != numReplicas) {
- try {
- LOG.info("Looping until block gets rereplicated to " + numReplicas);
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- blocks = dfsClient.getNamenode().
- getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
- replicaCount = blocks.get(0).getLocations().length;
- }
+ DFSTestUtil.waitReplication(fs, file1, numReplicas);
+ assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
// Make sure the corrupt replica is invalidated and removed from
// corruptReplicasMap
- corruptReplicaSize = cluster.getNamesystem().
- numCorruptReplicas(blk);
- while (corruptReplicaSize != 0 || replicaCount != numReplicas) {
- try {
- LOG.info("Looping until corrupt replica is invalidated");
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- corruptReplicaSize = cluster.getNamesystem().
- numCorruptReplicas(blk);
- blocks = dfsClient.getNamenode().
- getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
- replicaCount = blocks.get(0).getLocations().length;
- }
- // Make sure block is healthy
- assertTrue(corruptReplicaSize == 0);
- assertTrue(replicaCount == numReplicas);
- assertTrue(blocks.get(0).isCorrupt() == false);
+ DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1,
+ blk, 0);
cluster.shutdown();
}
@@ -386,36 +298,73 @@ public class TestDatanodeBlockScanner ex
public void testTruncatedBlockReport() throws Exception {
final Configuration conf = new HdfsConfiguration();
final short REPLICATION_FACTOR = (short)2;
+ final Path fileName = new Path("/file1");
+ String block; //block file name
+
+ conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 3L);
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 3);
+ conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3L);
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR).build();
+ long startTime = System.currentTimeMillis();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(REPLICATION_FACTOR)
+ .build();
cluster.waitActive();
- FileSystem fs = cluster.getFileSystem();
+
try {
- final Path fileName = new Path("/file1");
+ FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, fileName, 1, REPLICATION_FACTOR, 0);
DFSTestUtil.waitReplication(fs, fileName, REPLICATION_FACTOR);
+ block = DFSTestUtil.getFirstBlock(fs, fileName).getBlockName();
+ } finally {
+ cluster.shutdown();
+ }
- String block = DFSTestUtil.getFirstBlock(fs, fileName).getBlockName();
-
+ // Restart cluster and confirm block is verified on datanode 0,
+ // then truncate it on datanode 0.
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(REPLICATION_FACTOR)
+ .format(false)
+ .build();
+ cluster.waitActive();
+ try {
+ FileSystem fs = cluster.getFileSystem();
+ DatanodeInfo dn = new DatanodeInfo(cluster.getDataNodes().get(0).dnRegistration);
+ assertTrue(waitForVerification(dn, fs, fileName, 1, startTime, TIMEOUT) >= startTime);
+
// Truncate replica of block
- changeReplicaLength(block, 0, -1);
-
+ if (!changeReplicaLength(block, 0, -1)) {
+ throw new IOException(
+ "failed to find or change length of replica on node 0 "
+ + cluster.getDataNodes().get(0).getSelfAddr());
+ }
+ } finally {
cluster.shutdown();
+ }
- // restart the cluster
- cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(REPLICATION_FACTOR)
- .format(false)
- .build();
- cluster.startDataNodes(conf, 1, true, null, null);
- cluster.waitActive(); // now we have 3 datanodes
+ // Restart the cluster, add a node, and check that the truncated block is
+ // handled correctly
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(REPLICATION_FACTOR)
+ .format(false)
+ .build();
+ cluster.startDataNodes(conf, 1, true, null, null);
+ cluster.waitActive(); // now we have 3 datanodes
+
+ // Assure the cluster has left safe mode.
+ cluster.waitClusterUp();
+ assertFalse("failed to leave safe mode",
+ cluster.getNameNode().isInSafeMode());
- // wait for truncated block be detected and the block to be replicated
+ try {
+ // wait for truncated block be detected by block scanner,
+ // and the block to be replicated
DFSTestUtil.waitReplication(
cluster.getFileSystem(), fileName, REPLICATION_FACTOR);
// Make sure that truncated block will be deleted
- waitForBlockDeleted(block, 0);
+ waitForBlockDeleted(block, 0, TIMEOUT);
} finally {
cluster.shutdown();
}
@@ -431,22 +380,35 @@ public class TestDatanodeBlockScanner ex
MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
if (blockFile.exists()) {
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
- raFile.setLength(raFile.length()+lenDelta);
+ long origLen = raFile.length();
+ raFile.setLength(origLen + lenDelta);
raFile.close();
+ LOG.info("assigned length " + (origLen + lenDelta)
+ + " to block file " + blockFile.getPath()
+ + " on datanode " + dnIndex);
return true;
}
}
+ LOG.info("failed to change length of block " + blockName);
return false;
}
- private static void waitForBlockDeleted(String blockName, int dnIndex)
- throws IOException, InterruptedException {
+ private static void waitForBlockDeleted(String blockName, int dnIndex,
+ long timeout)
+ throws IOException, TimeoutException, InterruptedException {
File baseDir = new File(MiniDFSCluster.getBaseDirectory(), "data");
File blockFile1 = new File(baseDir, "data" + (2*dnIndex+1) +
MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
File blockFile2 = new File(baseDir, "data" + (2*dnIndex+2) +
MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
+ long failtime = System.currentTimeMillis()
+ + ((timeout > 0) ? timeout : Long.MAX_VALUE);
while (blockFile1.exists() || blockFile2.exists()) {
+ if (failtime < System.currentTimeMillis()) {
+ throw new TimeoutException("waited too long for blocks to be deleted: "
+ + blockFile1.getPath() + (blockFile1.exists() ? " still exists; " : " is absent; ")
+ + blockFile2.getPath() + (blockFile2.exists() ? " still exists." : " is absent."));
+ }
Thread.sleep(100);
}
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Fri Apr 29 03:03:25 2011
@@ -44,6 +44,10 @@ import org.junit.Test;
public class TestDistributedFileSystem {
private static final Random RAN = new Random();
+ {
+ ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ }
+
private boolean dualPortTesting = false;
private HdfsConfiguration getTestConfiguration() {
@@ -100,26 +104,94 @@ public class TestDistributedFileSystem {
@Test
public void testDFSClient() throws Exception {
Configuration conf = getTestConfiguration();
+ final long grace = 1000L;
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
- final Path filepath = new Path("/test/LeaseChecker/foo");
+ final String filepathstring = "/test/LeaseChecker/foo";
+ final Path[] filepaths = new Path[4];
+ for(int i = 0; i < filepaths.length; i++) {
+ filepaths[i] = new Path(filepathstring + i);
+ }
final long millis = System.currentTimeMillis();
{
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
- assertFalse(dfs.dfs.isLeaseCheckerStarted());
-
- //create a file
- FSDataOutputStream out = dfs.create(filepath);
- assertTrue(dfs.dfs.isLeaseCheckerStarted());
+ dfs.dfs.leasechecker.setGraceSleepPeriod(grace);
+ assertFalse(dfs.dfs.leasechecker.isRunning());
- //write something and close
- out.writeLong(millis);
- assertTrue(dfs.dfs.isLeaseCheckerStarted());
- out.close();
- assertTrue(dfs.dfs.isLeaseCheckerStarted());
+ {
+ //create a file
+ final FSDataOutputStream out = dfs.create(filepaths[0]);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //write something
+ out.writeLong(millis);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //close
+ out.close();
+ Thread.sleep(grace/4*3);
+ //within grace period
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ for(int i = 0; i < 3; i++) {
+ if (dfs.dfs.leasechecker.isRunning()) {
+ Thread.sleep(grace/2);
+ }
+ }
+ //passed grace period
+ assertFalse(dfs.dfs.leasechecker.isRunning());
+ }
+
+ {
+ //create file1
+ final FSDataOutputStream out1 = dfs.create(filepaths[1]);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //create file2
+ final FSDataOutputStream out2 = dfs.create(filepaths[2]);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+
+ //write something to file1
+ out1.writeLong(millis);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //close file1
+ out1.close();
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+
+ //write something to file2
+ out2.writeLong(millis);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //close file2
+ out2.close();
+ Thread.sleep(grace/4*3);
+ //within grace period
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ }
+
+ {
+ //create file3
+ final FSDataOutputStream out3 = dfs.create(filepaths[3]);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ Thread.sleep(grace/4*3);
+ //passed previous grace period, should still running
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //write something to file3
+ out3.writeLong(millis);
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ //close file3
+ out3.close();
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ Thread.sleep(grace/4*3);
+ //within grace period
+ assertTrue(dfs.dfs.leasechecker.isRunning());
+ for(int i = 0; i < 3; i++) {
+ if (dfs.dfs.leasechecker.isRunning()) {
+ Thread.sleep(grace/2);
+ }
+ }
+ //passed grace period
+ assertFalse(dfs.dfs.leasechecker.isRunning());
+ }
+
dfs.close();
}
@@ -146,15 +218,15 @@ public class TestDistributedFileSystem {
{
DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
- assertFalse(dfs.dfs.isLeaseCheckerStarted());
+ assertFalse(dfs.dfs.leasechecker.isRunning());
//open and check the file
- FSDataInputStream in = dfs.open(filepath);
- assertFalse(dfs.dfs.isLeaseCheckerStarted());
+ FSDataInputStream in = dfs.open(filepaths[0]);
+ assertFalse(dfs.dfs.leasechecker.isRunning());
assertEquals(millis, in.readLong());
- assertFalse(dfs.dfs.isLeaseCheckerStarted());
+ assertFalse(dfs.dfs.leasechecker.isRunning());
in.close();
- assertFalse(dfs.dfs.isLeaseCheckerStarted());
+ assertFalse(dfs.dfs.leasechecker.isRunning());
dfs.close();
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java Fri Apr 29 03:03:25 2011
@@ -17,12 +17,15 @@
*/
package org.apache.hadoop.hdfs;
+import static org.junit.Assert.*;
+
import java.io.File;
import java.io.IOException;
+import java.io.FileNotFoundException;
import java.net.InetSocketAddress;
import java.util.List;
-import junit.framework.TestCase;
+import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@@ -41,7 +44,7 @@ import org.apache.hadoop.hdfs.server.dat
* This class tests the building blocks that are needed to
* support HDFS appends.
*/
-public class TestFileAppend extends TestCase {
+public class TestFileAppend{
boolean simulatedStorage = false;
private static byte[] fileContents = null;
@@ -101,6 +104,7 @@ public class TestFileAppend extends Test
* Test that copy on write for blocks works correctly
* @throws IOException an exception might be thrown
*/
+ @Test
public void testCopyOnWrite() throws IOException {
Configuration conf = new HdfsConfiguration();
if (simulatedStorage) {
@@ -171,6 +175,7 @@ public class TestFileAppend extends Test
* Test a simple flush on a simple HDFS file.
* @throws IOException an exception might be thrown
*/
+ @Test
public void testSimpleFlush() throws IOException {
Configuration conf = new HdfsConfiguration();
if (simulatedStorage) {
@@ -226,6 +231,7 @@ public class TestFileAppend extends Test
* Test that file data can be flushed.
* @throws IOException an exception might be thrown
*/
+ @Test
public void testComplexFlush() throws IOException {
Configuration conf = new HdfsConfiguration();
if (simulatedStorage) {
@@ -268,4 +274,26 @@ public class TestFileAppend extends Test
cluster.shutdown();
}
}
+
+ /**
+ * FileNotFoundException is expected for appending to a non-exisiting file
+ *
+ * @throws FileNotFoundException as the result
+ */
+ @Test(expected = FileNotFoundException.class)
+ public void testFileNotFound() throws IOException {
+ Configuration conf = new HdfsConfiguration();
+ if (simulatedStorage) {
+ conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+ }
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ FileSystem fs = cluster.getFileSystem();
+ try {
+ Path file1 = new Path("/nonexistingfile.dat");
+ fs.append(file1);
+ } finally {
+ fs.close();
+ cluster.shutdown();
+ }
+ }
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java Fri Apr 29 03:03:25 2011
@@ -68,7 +68,7 @@ public class TestMissingBlocksAlert exte
// Corrupt the block
String block = DFSTestUtil.getFirstBlock(dfs, corruptFile).getBlockName();
- TestDatanodeBlockScanner.corruptReplica(block, 0);
+ assertTrue(cluster.corruptReplica(block, 0));
// read the file so that the corrupt block is reported to NN
FSDataInputStream in = dfs.open(corruptFile);
Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java Fri Apr 29 03:03:25 2011
@@ -151,6 +151,7 @@ public class TestReplication extends Tes
DFSClient dfsClient = null;
LocatedBlocks blocks = null;
int replicaCount = 0;
+ short replFactor = 1;
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
cluster.waitActive();
fs = cluster.getFileSystem();
@@ -159,16 +160,19 @@ public class TestReplication extends Tes
// Create file with replication factor of 1
Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
- DFSTestUtil.createFile(fs, file1, 1024, (short)1, 0);
- DFSTestUtil.waitReplication(fs, file1, (short)1);
+ DFSTestUtil.createFile(fs, file1, 1024, replFactor, 0);
+ DFSTestUtil.waitReplication(fs, file1, replFactor);
// Corrupt the block belonging to the created file
String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
- cluster.corruptBlockOnDataNodes(block);
-
+
+ int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
+ assertEquals("Corrupted too few blocks", replFactor, blockFilesCorrupted);
+
// Increase replication factor, this should invoke transfer request
// Receiving datanode fails on checksum and reports it to namenode
- fs.setReplication(file1, (short)2);
+ replFactor = 2;
+ fs.setReplication(file1, replFactor);
// Now get block details and check if the block is corrupt
blocks = dfsClient.getNamenode().
Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Fri Apr 29 03:03:25 2011
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.mortbay.log.Log;
import junit.framework.TestCase;
/**
@@ -53,6 +55,9 @@ public class TestBalancer extends TestCa
ClientProtocol client;
+ static final long TIMEOUT = 20000L; //msec
+ static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5%
+ static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta
static final int DEFAULT_BLOCK_SIZE = 10;
private Balancer balancer;
private Random r = new Random();
@@ -186,28 +191,101 @@ public class TestBalancer extends TestCa
cluster.shutdown();
}
- /* wait for one heartbeat */
- private void waitForHeartBeat( long expectedUsedSpace, long expectedTotalSpace )
- throws IOException {
- long[] status = client.getStats();
- while(status[0] != expectedTotalSpace || status[1] != expectedUsedSpace ) {
+ /**
+ * Wait until heartbeat gives expected results, within CAPACITY_ALLOWED_VARIANCE,
+ * summed over all nodes. Times out after TIMEOUT msec.
+ * @param expectedUsedSpace
+ * @param expectedTotalSpace
+ * @throws IOException - if getStats() fails
+ * @throws TimeoutException
+ */
+ private void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace)
+ throws IOException, TimeoutException {
+ long timeout = TIMEOUT;
+ long failtime = (timeout <= 0L) ? Long.MAX_VALUE
+ : System.currentTimeMillis() + timeout;
+
+ while (true) {
+ long[] status = client.getStats();
+ double totalSpaceVariance = Math.abs((double)status[0] - expectedTotalSpace)
+ / expectedTotalSpace;
+ double usedSpaceVariance = Math.abs((double)status[1] - expectedUsedSpace)
+ / expectedUsedSpace;
+ if (totalSpaceVariance < CAPACITY_ALLOWED_VARIANCE
+ && usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE)
+ break; //done
+
+ if (System.currentTimeMillis() > failtime) {
+ throw new TimeoutException("Cluster failed to reached expected values of "
+ + "totalSpace (current: " + status[0]
+ + ", expected: " + expectedTotalSpace
+ + "), or usedSpace (current: " + status[1]
+ + ", expected: " + expectedUsedSpace
+ + "), in more than " + timeout + " msec.");
+ }
try {
Thread.sleep(100L);
} catch(InterruptedException ignored) {
}
- status = client.getStats();
}
}
+
+ /**
+ * Wait until balanced: each datanode gives utilization within
+ * BALANCE_ALLOWED_VARIANCE of average
+ * @throws IOException
+ * @throws TimeoutException
+ */
+ private void waitForBalancer(long totalUsedSpace, long totalCapacity)
+ throws IOException, TimeoutException {
+ long timeout = TIMEOUT;
+ long failtime = (timeout <= 0L) ? Long.MAX_VALUE
+ : System.currentTimeMillis() + timeout;
+ final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
+ boolean balanced;
+ do {
+ DatanodeInfo[] datanodeReport =
+ client.getDatanodeReport(DatanodeReportType.ALL);
+ assertEquals(datanodeReport.length, cluster.getDataNodes().size());
+ balanced = true;
+ for (DatanodeInfo datanode : datanodeReport) {
+ double nodeUtilization = ((double)datanode.getDfsUsed())
+ / datanode.getCapacity();
+ if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) {
+ balanced = false;
+ if (System.currentTimeMillis() > failtime) {
+ throw new TimeoutException(
+ "Rebalancing expected avg utilization to become "
+ + avgUtilization + ", but on datanode " + datanode
+ + " it remains at " + nodeUtilization
+ + " after more than " + TIMEOUT + " msec.");
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+ }
+ break;
+ }
+ }
+ } while (!balanced);
+ }
- /* This test start a one-node cluster, fill the node to be 30% full;
- * It then adds an empty node and start balancing.
- * @param newCapacity new node's capacity
- * @param new
+ /** This test start a cluster with specified number of nodes,
+ * and fills it to be 30% full (with a single file replicated identically
+ * to all datanodes);
+ * It then adds one new empty node and starts balancing.
+ *
+ * @param conf - configuration
+ * @param capacities - array of capacities of original nodes in cluster
+ * @param racks - array of racks for original nodes in cluster
+ * @param newCapacity - new node's capacity
+ * @param newRack - new node's rack
+ * @throws Exception
*/
- private void test(Configuration conf, long[] capacities, String[] racks,
+ private void doTest(Configuration conf, long[] capacities, String[] racks,
long newCapacity, String newRack) throws Exception {
+ assertEquals(capacities.length, racks.length);
int numOfDatanodes = capacities.length;
- assertEquals(numOfDatanodes, racks.length);
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(capacities.length)
.racks(racks)
@@ -247,26 +325,8 @@ public class TestBalancer extends TestCa
balancer.run(new String[0]);
waitForHeartBeat(totalUsedSpace, totalCapacity);
- boolean balanced;
- do {
- DatanodeInfo[] datanodeReport =
- client.getDatanodeReport(DatanodeReportType.ALL);
- assertEquals(datanodeReport.length, cluster.getDataNodes().size());
- balanced = true;
- double avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
- for(DatanodeInfo datanode:datanodeReport) {
- if(Math.abs(avgUtilization-
- ((double)datanode.getDfsUsed())/datanode.getCapacity()*100)>10) {
- balanced = false;
- try {
- Thread.sleep(100);
- } catch(InterruptedException ignored) {
- }
- break;
- }
- }
- } while(!balanced);
-
+ Log.info("Rebalancing.");
+ waitForBalancer(totalUsedSpace, totalCapacity);
}
private void runBalancerDefaultConstructor(Configuration conf,
@@ -279,37 +339,19 @@ public class TestBalancer extends TestCa
balancer.run(new String[0]);
waitForHeartBeat(totalUsedSpace, totalCapacity);
- boolean balanced;
- do {
- DatanodeInfo[] datanodeReport = client
- .getDatanodeReport(DatanodeReportType.ALL);
- assertEquals(datanodeReport.length, cluster.getDataNodes().size());
- balanced = true;
- double avgUtilization = ((double) totalUsedSpace) / totalCapacity * 100;
- for (DatanodeInfo datanode : datanodeReport) {
- if (Math.abs(avgUtilization - ((double) datanode.getDfsUsed())
- / datanode.getCapacity() * 100) > 10) {
- balanced = false;
- try {
- Thread.sleep(100);
- } catch (InterruptedException ignored) {
- }
- break;
- }
- }
- } while (!balanced);
-
+ Log.info("Rebalancing with default ctor.");
+ waitForBalancer(totalUsedSpace, totalCapacity);
}
/** one-node cluster test*/
private void oneNodeTest(Configuration conf) throws Exception {
// add an empty node with half of the CAPACITY & the same rack
- test(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
+ doTest(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
}
/** two-node cluster test */
private void twoNodeTest(Configuration conf) throws Exception {
- test(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+ doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
CAPACITY, RACK2);
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1097671&r1=1097670&r2=1097671&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Fri Apr 29 03:03:25 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
import java.io.IOException;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
/**
@@ -57,4 +58,19 @@ public class NameNodeAdapter {
public static Server getRpcServer(NameNode namenode) {
return namenode.server;
}
+
+ /**
+ * Return a tuple of the replica state (number racks, number live
+ * replicas, and number needed replicas) for the given block.
+ * @param namenode to proxy the invocation to.
+ */
+ public static int[] getReplicaInfo(NameNode namenode, Block b) {
+ FSNamesystem ns = namenode.getNamesystem();
+ ns.readLock();
+ int[] r = {ns.blockManager.getNumberOfRacks(b),
+ ns.blockManager.countNodes(b).liveReplicas(),
+ ns.blockManager.neededReplications.contains(b) ? 1 : 0};
+ ns.readUnlock();
+ return r;
+ }
}