You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sc...@apache.org on 2010/11/12 01:17:23 UTC
svn commit: r1034221 [2/2] - in /hadoop/mapreduce/trunk: ./
src/contrib/raid/src/java/org/apache/hadoop/hdfs/
src/contrib/raid/src/java/org/apache/hadoop/raid/
src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/
src/contrib/raid/src/test/org/apa...
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java Fri Nov 12 00:17:22 2010
@@ -48,6 +48,7 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.Block;
@@ -56,6 +57,7 @@ import org.apache.hadoop.hdfs.Distribute
import org.apache.hadoop.hdfs.RaidDFSUtil;
import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
import org.apache.hadoop.hdfs.TestRaidDfs;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.raid.RaidNode;
import org.apache.hadoop.raid.RaidUtils;
@@ -520,7 +522,7 @@ public class TestBlockFixer extends Test
assertTrue(corrupted);
}
- void reportCorruptBlocks(FileSystem fs, Path file, int[] idxs,
+ static void reportCorruptBlocks(FileSystem fs, Path file, int[] idxs,
long blockSize) throws IOException {
FSDataInputStream in = fs.open(file);
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java Fri Nov 12 00:17:22 2010
@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.Reporter;
+
import org.apache.hadoop.raid.protocol.PolicyInfo;
public class TestDirectoryTraversal extends TestCase {
@@ -89,6 +91,7 @@ public class TestDirectoryTraversal exte
}
public void testSuspension() throws IOException {
+ LOG.info("Starting testSuspension");
mySetup();
try {
@@ -107,7 +110,7 @@ public class TestDirectoryTraversal exte
Path raid = new Path("/raid");
DirectoryTraversal.FileFilter filter =
new RaidFilter.TimeBasedFilter(conf,
- RaidNode.getDestinationPath(conf), 1, System.currentTimeMillis(), 0);
+ RaidNode.xorDestinationPath(conf), 1, System.currentTimeMillis(), 0);
List<FileStatus> selected = dt.getFilteredFiles(filter, limit);
for (FileStatus f: selected) {
LOG.info(f.getPath());
@@ -124,6 +127,50 @@ public class TestDirectoryTraversal exte
}
}
+ public void testFileFilter() throws IOException {
+ mySetup();
+
+ try {
+ Path topDir = new Path(TEST_DIR + "/testFileFilter");
+ int targetRepl = 1;
+ createTestTree(topDir);
+ Path file = new Path(topDir.toString() + "/a/f1");
+ FileStatus stat = fs.getFileStatus(file);
+ PolicyInfo info = new PolicyInfo("testFileFilter", conf);
+ info.setSrcPath(topDir.toString());
+ info.setErasureCode("rs");
+ info.setDescription("test policy");
+ info.setProperty("targetReplication", "1");
+ info.setProperty("metaReplication", "1");
+
+ DirectoryTraversal.FileFilter timeBasedXORFilter =
+ new RaidFilter.TimeBasedFilter(conf,
+ RaidNode.xorDestinationPath(conf), targetRepl,
+ System.currentTimeMillis(), 0);
+ DirectoryTraversal.FileFilter timeBasedRSFilter =
+ new RaidFilter.TimeBasedFilter(conf,
+ RaidNode.rsDestinationPath(conf), targetRepl,
+ System.currentTimeMillis(), 0);
+ DirectoryTraversal.FileFilter preferenceForRSFilter =
+ new RaidFilter.PreferenceFilter(
+ conf, RaidNode.rsDestinationPath(conf),
+ RaidNode.xorDestinationPath(conf), 1, System.currentTimeMillis(), 0);
+
+ assertTrue(timeBasedXORFilter.check(stat));
+ assertTrue(timeBasedRSFilter.check(stat));
+ assertTrue(preferenceForRSFilter.check(stat));
+
+ RaidNode.doRaid(
+ conf, info, stat, new RaidNode.Statistics(), Reporter.NULL);
+
+ assertTrue(timeBasedXORFilter.check(stat));
+ assertFalse(timeBasedRSFilter.check(stat));
+ assertFalse(preferenceForRSFilter.check(stat));
+ } finally {
+ myTearDown();
+ }
+ }
+
/**
* Creates a test directory tree.
* top
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java Fri Nov 12 00:17:22 2010
@@ -65,6 +65,7 @@ public class TestRaidFilter extends Test
PolicyInfo info1 = new PolicyInfo("p1", conf);
info1.setSrcPath(src1.toString());
+ info1.setErasureCode("xor");
info1.setDescription("test policy");
info1.setProperty("targetReplication", "1");
info1.setProperty("metaReplication", "1");
@@ -72,6 +73,7 @@ public class TestRaidFilter extends Test
PolicyInfo info2 = new PolicyInfo("p2", conf);
info2.setSrcPath(src2.toString());
+ info2.setErasureCode("xor");
info2.setDescription("test policy");
info2.setProperty("targetReplication", "1");
info2.setProperty("metaReplication", "1");
@@ -103,7 +105,7 @@ public class TestRaidFilter extends Test
RaidFilter.Statistics stats = new RaidFilter.Statistics();
RaidFilter.TimeBasedFilter filter = new RaidFilter.TimeBasedFilter(
- conf, RaidNode.getDestinationPath(conf), info1, all,
+ conf, RaidNode.xorDestinationPath(conf), info1, all,
System.currentTimeMillis(), stats);
System.out.println("Stats " + stats);
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java Fri Nov 12 00:17:22 2010
@@ -113,7 +113,7 @@ public class TestRaidHar extends TestCas
String str = "<configuration> " +
"<srcPath prefix=\"/user/test/raidtest\"> " +
"<policy name = \"RaidTest1\"> " +
- "<destPath> /destraid</destPath> " +
+ "<erasureCode>xor</erasureCode> " +
"<property> " +
"<name>targetReplication</name> " +
"<value>" + targetReplication + "</value> " +
@@ -247,6 +247,9 @@ public class TestRaidHar extends TestCas
}
}
if (count == 1 && listPaths.length == 1) {
+ Path partfile = new Path(harPath, "part-0");
+ assertEquals(fileSys.getFileStatus(partfile).getReplication(),
+ targetReplication);
break;
}
} catch (FileNotFoundException e) {
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java Fri Nov 12 00:17:22 2010
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapred.MiniMRCl
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.raid.protocol.PolicyInfo;
import org.apache.hadoop.raid.protocol.PolicyList;
+import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
/**
* Test the generation of parity blocks for files with different block
@@ -122,12 +123,22 @@ public class TestRaidNode extends TestCa
policies = new java.util.ArrayList<String>();
}
+ public void addPolicy(String name, String path, String parent) {
+ String str =
+ "<srcPath prefix=\"" + path + "\"> " +
+ "<policy name = \"" + name + "\"> " +
+ "<parentPolicy>" + parent + "</parentPolicy>" +
+ "</policy>" +
+ "</srcPath>";
+ policies.add(str);
+ }
+
public void addPolicy(String name, short srcReplication,
long targetReplication, long metaReplication, long stripeLength) {
String str =
"<srcPath prefix=\"/user/dhruba/raidtest\"> " +
"<policy name = \"" + name + "\"> " +
- "<destPath> /destraid</destPath> " +
+ "<erasureCode>xor</erasureCode> " +
"<property> " +
"<name>srcReplication</name> " +
"<value>" + srcReplication + "</value> " +
@@ -169,7 +180,7 @@ public class TestRaidNode extends TestCa
String str =
"<srcPath prefix=\"" + path + "\"> " +
"<policy name = \"" + name + "\"> " +
- "<destPath> /destraid</destPath> " +
+ "<erasureCode>xor</erasureCode> " +
"<property> " +
"<name>srcReplication</name> " +
"<value>" + srcReplication + "</value> " +
@@ -280,7 +291,6 @@ public class TestRaidNode extends TestCa
// create an instance of the RaidNode
cnode = RaidNode.createRaidNode(null, conf);
- int times = 10;
FileStatus[] listPaths = null;
@@ -345,7 +355,7 @@ public class TestRaidNode extends TestCa
StringUtils.stringifyException(e));
throw e;
} finally {
- shell.close();
+ if (shell != null) shell.close();
if (cnode != null) { cnode.stop(); cnode.join(); }
LOG.info("doTestPathFilter delete file " + file1);
fileSys.delete(file1, true);
@@ -388,8 +398,14 @@ public class TestRaidNode extends TestCa
long firstmodtime = 0;
// wait till file is raided
while (true) {
- Thread.sleep(20000L); // waiting
- listPaths = fileSys.listStatus(destPath);
+ Thread.sleep(1000); // waiting
+ try {
+ listPaths = fileSys.listStatus(destPath);
+ } catch (FileNotFoundException e) {
+ LOG.warn("File not found " + destPath);
+ // The directory have been deleted by the purge thread.
+ continue;
+ }
int count = 0;
if (listPaths != null && listPaths.length == 1) {
for (FileStatus s : listPaths) {
@@ -554,7 +570,7 @@ public class TestRaidNode extends TestCa
long crc, long corruptOffset) throws IOException {
// recover the file assuming that we encountered a corruption at offset 0
String[] args = new String[3];
- args[0] = "recover";
+ args[0] = "-recover";
args[1] = file1.toString();
args[2] = Long.toString(corruptOffset);
Path recover1 = shell.recover(args[0], args, 1)[0];
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java Fri Nov 12 00:17:22 2010
@@ -47,9 +47,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.hdfs.TestRaidDfs;
import org.apache.hadoop.raid.protocol.PolicyInfo;
import org.apache.hadoop.raid.protocol.PolicyList;
+import org.apache.hadoop.hdfs.TestRaidDfs;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.raid.protocol.PolicyInfo;
/**
* If a file gets deleted, then verify that the parity file gets deleted too.
@@ -61,6 +63,7 @@ public class TestRaidPurge extends TestC
"test-raid.xml").getAbsolutePath();
final static long RELOAD_INTERVAL = 1000;
final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidNode");
+ final Random rand = new Random();
{
((Log4JLogger)RaidNode.LOG).getLogger().setLevel(Level.ALL);
@@ -69,8 +72,11 @@ public class TestRaidPurge extends TestC
Configuration conf;
String namenode = null;
+ String hftp = null;
MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
FileSystem fileSys = null;
+ String jobTrackerName = null;
/**
* create mapreduce and dfs clusters
@@ -97,7 +103,7 @@ public class TestRaidPurge extends TestC
}
conf.set("raid.server.address", "localhost:0");
-
+
// create a dfs and map-reduce cluster
final int taskTrackers = 4;
final int jobTrackerPort = 60050;
@@ -106,25 +112,31 @@ public class TestRaidPurge extends TestC
dfs.waitActive();
fileSys = dfs.getFileSystem();
namenode = fileSys.getUri().toString();
+ mr = new MiniMRCluster(taskTrackers, namenode, 3);
+ jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+ hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
+
+ FileSystem.setDefaultUri(conf, namenode);
+ conf.set("mapred.job.tracker", jobTrackerName);
}
/**
* create raid.xml file for RaidNode
*/
- private void mySetup(String srcPath, long targetReplication,
- long metaReplication, long stripeLength)
- throws Exception {
- mySetup(srcPath, targetReplication, metaReplication, stripeLength, 1);
+ private void mySetup(long targetReplication,
+ long metaReplication, long stripeLength) throws Exception {
+ int harDelay = 1; // 1 day.
+ mySetup(targetReplication, metaReplication, stripeLength, harDelay);
}
- private void mySetup(String srcPath, long targetReplication,
- long metaReplication, long stripeLength, int harDelay)
- throws Exception {
+ private void mySetup(long targetReplication,
+ long metaReplication, long stripeLength, int harDelay) throws Exception {
FileWriter fileWriter = new FileWriter(CONFIG_FILE);
fileWriter.write("<?xml version=\"1.0\"?>\n");
String str = "<configuration> " +
- "<srcPath prefix=\"" + srcPath + "\"> " +
+ "<srcPath prefix=\"/user/dhruba/raidtest\"> " +
"<policy name = \"RaidTest1\"> " +
+ "<erasureCode>xor</erasureCode> " +
"<destPath> /destraid</destPath> " +
"<property> " +
"<name>targetReplication</name> " +
@@ -154,7 +166,7 @@ public class TestRaidPurge extends TestC
"<property> " +
"<name>time_before_har</name> " +
"<value> " + harDelay + "</value> " +
- "<description> time before har'ing parity files" +
+ "<description> amount of time waited before har'ing parity files" +
"</description> " +
"</property> " +
"</policy>" +
@@ -168,6 +180,7 @@ public class TestRaidPurge extends TestC
* stop clusters created earlier
*/
private void stopClusters() throws Exception {
+ if (mr != null) { mr.shutdown(); }
if (dfs != null) { dfs.shutdown(); }
}
@@ -178,7 +191,6 @@ public class TestRaidPurge extends TestC
public void testPurge() throws Exception {
LOG.info("Test testPurge started.");
- String srcPaths [] = { "/user/dhruba/raidtest", "/user/dhruba/raid*" };
long blockSizes [] = {1024L};
long stripeLengths [] = {5};
long targetReplication = 1;
@@ -188,13 +200,11 @@ public class TestRaidPurge extends TestC
createClusters(true);
try {
- for (String srcPath : srcPaths ) {
- for (long blockSize : blockSizes) {
- for (long stripeLength : stripeLengths) {
- doTestPurge(iter, srcPath, targetReplication, metaReplication,
- stripeLength, blockSize, numBlock);
- iter++;
- }
+ for (long blockSize : blockSizes) {
+ for (long stripeLength : stripeLengths) {
+ doTestPurge(iter, targetReplication, metaReplication,
+ stripeLength, blockSize, numBlock);
+ iter++;
}
}
} finally {
@@ -207,12 +217,12 @@ public class TestRaidPurge extends TestC
* Create parity file, delete original file and then validate that
* parity file is automatically deleted.
*/
- private void doTestPurge(int iter, String srcPath, long targetReplication,
+ private void doTestPurge(int iter, long targetReplication,
long metaReplication, long stripeLength,
long blockSize, int numBlock) throws Exception {
LOG.info("doTestPurge started---------------------------:" + " iter " + iter +
" blockSize=" + blockSize + " stripeLength=" + stripeLength);
- mySetup(srcPath, targetReplication, metaReplication, stripeLength);
+ mySetup(targetReplication, metaReplication, stripeLength);
Path dir = new Path("/user/dhruba/raidtest/");
Path file1 = new Path(dir + "/file" + iter);
RaidNode cnode = null;
@@ -287,7 +297,7 @@ public class TestRaidPurge extends TestC
LOG.info("testPurgeHar started");
int harDelay = 0;
createClusters(true);
- mySetup("/user/dhruba/raidtest", 1, 1, 5, harDelay);
+ mySetup(1, 1, 5, harDelay);
Path dir = new Path("/user/dhruba/raidtest/");
Path destPath = new Path("/destraid/user/dhruba/raidtest");
Path file1 = new Path(dir + "/file");
@@ -324,7 +334,14 @@ public class TestRaidPurge extends TestC
boolean found = false;
FileStatus[] listPaths = null;
while (!found || listPaths == null || listPaths.length > 1) {
- listPaths = fileSys.listStatus(destPath);
+ try {
+ listPaths = fileSys.listStatus(destPath);
+ } catch (FileNotFoundException e) {
+ // If the parent directory is deleted because the har is deleted
+ // and the parent is empty, try again.
+ Thread.sleep(1000);
+ continue;
+ }
if (listPaths != null) {
for (FileStatus s: listPaths) {
LOG.info("testPurgeHar waiting for parity file to be recreated" +
@@ -348,4 +365,153 @@ public class TestRaidPurge extends TestC
stopClusters();
}
}
+
+ /**
+ * Create parity file, delete original file's directory and then validate that
+ * parity directory is automatically deleted.
+ */
+ public void testPurgeDirectory() throws Exception {
+ long stripeLength = 5;
+ long blockSize = 8192;
+ long targetReplication = 1;
+ long metaReplication = 1;
+ int numBlock = 9;
+
+ createClusters(true);
+ mySetup(targetReplication, metaReplication, stripeLength);
+ Path dir = new Path("/user/dhruba/raidtest/");
+ Path file1 = new Path(dir + "/file1");
+ RaidNode cnode = null;
+ try {
+ TestRaidNode.createOldFile(fileSys, file1, 1, numBlock, blockSize);
+
+ // create an instance of the RaidNode
+ Configuration localConf = new Configuration(conf);
+ localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+ cnode = RaidNode.createRaidNode(null, localConf);
+
+ Path destPath = new Path("/destraid/user/dhruba/raidtest");
+ TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
+
+ // delete original directory.
+ assertTrue("Unable to delete original directory " + file1 ,
+ fileSys.delete(file1.getParent(), true));
+ LOG.info("deleted file " + file1);
+
+ // wait till parity file and directory are automatically deleted
+ long start = System.currentTimeMillis();
+ while (fileSys.exists(destPath) &&
+ System.currentTimeMillis() - start < 120000) {
+ LOG.info("testPurgeDirectory waiting for parity files to be removed.");
+ Thread.sleep(1000); // keep waiting
+ }
+ assertFalse(fileSys.exists(destPath));
+
+ } catch (Exception e) {
+ LOG.info("testPurgeDirectory Exception " + e +
+ StringUtils.stringifyException(e));
+ throw e;
+ } finally {
+ if (cnode != null) { cnode.stop(); cnode.join(); }
+ LOG.info("testPurgeDirectory delete file " + file1);
+ fileSys.delete(file1, true);
+ stopClusters();
+ }
+ }
+
+ /**
+ * Test that an XOR parity file is removed when a RS parity file is detected.
+ */
+ public void testPurgePreference() throws Exception {
+ createClusters(true);
+ Path dir = new Path("/user/test/raidtest/");
+ Path file1 = new Path(dir + "/file1");
+
+ PolicyInfo infoXor = new PolicyInfo("testPurgePreference", conf);
+ infoXor.setSrcPath("/user/test/raidtest");
+ infoXor.setErasureCode("xor");
+ infoXor.setDescription("test policy");
+ infoXor.setProperty("targetReplication", "2");
+ infoXor.setProperty("metaReplication", "2");
+
+ PolicyInfo infoRs = new PolicyInfo("testPurgePreference", conf);
+ infoRs.setSrcPath("/user/test/raidtest");
+ infoRs.setErasureCode("rs");
+ infoRs.setDescription("test policy");
+ infoRs.setProperty("targetReplication", "1");
+ infoRs.setProperty("metaReplication", "1");
+ try {
+ TestRaidNode.createOldFile(fileSys, file1, 1, 9, 8192L);
+ FileStatus stat = fileSys.getFileStatus(file1);
+
+ // Create the parity files.
+ RaidNode.doRaid(
+ conf, infoXor, stat, new RaidNode.Statistics(), Reporter.NULL);
+ RaidNode.doRaid(
+ conf, infoRs, stat, new RaidNode.Statistics(), Reporter.NULL);
+ Path xorParity =
+ new Path(RaidNode.DEFAULT_RAID_LOCATION, "user/test/raidtest/file1");
+ Path rsParity =
+ new Path(RaidNode.DEFAULT_RAIDRS_LOCATION, "user/test/raidtest/file1");
+ assertTrue(fileSys.exists(xorParity));
+ assertTrue(fileSys.exists(rsParity));
+
+ // Check purge of a single parity file.
+ RaidNode cnode = RaidNode.createRaidNode(conf);
+ FileStatus raidRsStat =
+ fileSys.getFileStatus(new Path(RaidNode.DEFAULT_RAIDRS_LOCATION));
+ cnode.purgeMonitor.recursePurge(infoRs.getErasureCode(), fileSys, fileSys,
+ RaidNode.DEFAULT_RAIDRS_LOCATION, raidRsStat);
+
+ // Calling purge under the RS path has no effect.
+ assertTrue(fileSys.exists(xorParity));
+ assertTrue(fileSys.exists(rsParity));
+
+ FileStatus raidStat =
+ fileSys.getFileStatus(new Path(RaidNode.DEFAULT_RAID_LOCATION));
+ cnode.purgeMonitor.recursePurge(infoXor.getErasureCode(), fileSys, fileSys,
+ RaidNode.DEFAULT_RAID_LOCATION, raidStat);
+ // XOR parity must have been purged by now.
+ assertFalse(fileSys.exists(xorParity));
+ assertTrue(fileSys.exists(rsParity));
+
+ // Now check the purge of a parity har.
+ // Delete the RS parity for now.
+ fileSys.delete(rsParity);
+ // Recreate the XOR parity.
+ Path xorHar =
+ new Path(RaidNode.DEFAULT_RAID_LOCATION, "user/test/raidtest/raidtest" +
+ RaidNode.HAR_SUFFIX);
+ RaidNode.doRaid(
+ conf, infoXor, stat, new RaidNode.Statistics(), Reporter.NULL);
+ assertTrue(fileSys.exists(xorParity));
+ assertFalse(fileSys.exists(xorHar));
+
+ // Create the har.
+ long cutoff = System.currentTimeMillis();
+ cnode.recurseHar(infoXor, fileSys, raidStat,
+ RaidNode.DEFAULT_RAID_LOCATION, fileSys, cutoff,
+ RaidNode.tmpHarPathForCode(conf, infoXor.getErasureCode()));
+
+ // Call purge to get rid of the parity file. The har should remain.
+ cnode.purgeMonitor.recursePurge(infoXor.getErasureCode(), fileSys, fileSys,
+ RaidNode.DEFAULT_RAID_LOCATION, raidStat);
+ // XOR har should exist but xor parity file should have been purged.
+ assertFalse(fileSys.exists(xorParity));
+ assertTrue(fileSys.exists(xorHar));
+
+ // Now create the RS parity.
+ RaidNode.doRaid(
+ conf, infoRs, stat, new RaidNode.Statistics(), Reporter.NULL);
+ cnode.purgeMonitor.recursePurge(infoXor.getErasureCode(), fileSys, fileSys,
+ RaidNode.DEFAULT_RAID_LOCATION, raidStat);
+ // XOR har should get deleted.
+ assertTrue(fileSys.exists(rsParity));
+ assertFalse(fileSys.exists(xorParity));
+ assertFalse(fileSys.exists(xorHar));
+
+ } finally {
+ stopClusters();
+ }
+ }
}
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java Fri Nov 12 00:17:22 2010
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.Distribute
import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
import org.apache.hadoop.hdfs.TestRaidDfs;
import org.apache.hadoop.hdfs.RaidDFSUtil;
+import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
import org.apache.hadoop.raid.RaidNode;
@@ -120,16 +121,10 @@ public class TestRaidShell extends TestC
int[] corruptBlockIdxs = new int[]{0, 4, 6};
for (int idx: corruptBlockIdxs) {
LOG.info("Corrupting block " + locations.get(idx).getBlock());
- TestRaidDfs.corruptBlock(file1, locations.get(idx).getBlock(),
- NUM_DATANODES, false); // corrupt block
- long offset = idx * blockSize;
- try {
- readAndDiscard(fileSys, file1, offset, blockSize);
- fail("Expected checksumexception not thrown");
- } catch (ChecksumException e) {
- LOG.info("Block at offset " + offset + " got expected exception");
- }
+ corruptBlock(locations.get(idx).getBlock().getBlockName());
}
+ TestBlockFixer.reportCorruptBlocks(fileSys, file1, corruptBlockIdxs,
+ srcStat.getBlockSize());
String fileUriPath = file1.toUri().getPath();
waitForCorruptBlocks(corruptBlockIdxs.length, dfs, file1);
@@ -150,14 +145,9 @@ public class TestRaidShell extends TestC
long parityCrc = getCRC(fileSys, parityFile);
locations = RaidDFSUtil.getBlockLocations(
dfs, parityFile.toUri().getPath(), 0, parityStat.getLen());
- TestRaidDfs.corruptBlock(parityFile, locations.get(0).getBlock(),
- NUM_DATANODES, false); // corrupt block
- try {
- readAndDiscard(fileSys, parityFile, 0, blockSize);
- fail("Expected checksumexception not thrown");
- } catch (ChecksumException e) {
- LOG.info("Parity Block at offset 0 got expected exception");
- }
+ corruptBlock(locations.get(0).getBlock().getBlockName());
+ TestBlockFixer.reportCorruptBlocks(fileSys, parityFile, new int[]{0},
+ srcStat.getBlockSize());
waitForCorruptBlocks(1, dfs, parityFile);
args[1] = parityFile.toUri().getPath();
@@ -294,14 +284,11 @@ public class TestRaidShell extends TestC
return crc.getValue();
}
- private static void readAndDiscard(
- FileSystem fs, Path p, long offset, long length) throws IOException {
- FSDataInputStream in = fs.open(p);
- in.seek(offset);
- long count = 0;
- for (int b = 0; b >= 0 && count < length; count++) {
- b = in.read();
+ void corruptBlock(String blockName) throws IOException {
+ boolean corrupted = false;
+ for (int i = 0; i < NUM_DATANODES; i++) {
+ corrupted |= TestDatanodeBlockScanner.corruptReplica(blockName, i);
}
+ assertTrue(corrupted);
}
}
-
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java?rev=1034221&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java Fri Nov 12 00:17:22 2010
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
+import org.apache.hadoop.hdfs.RaidDFSUtil;
+import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
+import org.apache.hadoop.hdfs.TestRaidDfs;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.raid.RaidNode;
+
+
+public class TestReedSolomonDecoder extends TestCase {
+ final static Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.TestReedSolomonDecoder");
+ final static String TEST_DIR = new File(System.getProperty("test.build.data",
+ "build/contrib/raid/test/data")).getAbsolutePath();
+ final static int NUM_DATANODES = 3;
+
+ Configuration conf;
+ MiniDFSCluster dfs = null;
+ FileSystem fileSys = null;
+
+ public void testDecoder() throws Exception {
+ mySetup();
+ int stripeSize = 10;
+ int paritySize = 4;
+ long blockSize = 8192;
+ Path file1 = new Path("/user/raidtest/file1");
+ Path recoveredFile1 = new Path("/user/raidtest/file1.recovered");
+ Path parityFile1 = new Path("/rsraid/user/raidtest/file1");
+ long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
+ 1, 25, blockSize);
+ FileStatus file1Stat = fileSys.getFileStatus(file1);
+
+ conf.setInt("raid.rsdecoder.bufsize", 512);
+ conf.setInt("raid.rsencoder.bufsize", 512);
+
+ try {
+ // First encode the file.
+ ReedSolomonEncoder encoder = new ReedSolomonEncoder(
+ conf, stripeSize, paritySize);
+ short parityRepl = 1;
+ encoder.encodeFile(fileSys, file1, fileSys, parityFile1, parityRepl,
+ Reporter.NULL);
+
+ // Ensure there are no corrupt files yet.
+ DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+ String[] corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
+ assertEquals(corruptFiles.length, 0);
+
+ // Now corrupt the file.
+ long corruptOffset = blockSize * 5;
+ FileStatus srcStat = fileSys.getFileStatus(file1);
+ LocatedBlocks locations = RaidDFSUtil.getBlockLocations(dfs,
+ file1.toUri().getPath(), 0, srcStat.getLen());
+ corruptBlock(locations.get(5).getBlock().getBlockName());
+ corruptBlock(locations.get(6).getBlock().getBlockName());
+ TestBlockFixer.reportCorruptBlocks(dfs, file1, new int[]{5, 6},
+ srcStat.getBlockSize());
+
+ // Ensure file is corrupted.
+ corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
+ assertEquals(corruptFiles.length, 1);
+ assertEquals(corruptFiles[0], file1.toString());
+
+ // Fix the file.
+ ReedSolomonDecoder decoder = new ReedSolomonDecoder(
+ conf, stripeSize, paritySize);
+ decoder.decodeFile(fileSys, file1, fileSys, parityFile1,
+ corruptOffset, recoveredFile1);
+ assertTrue(TestRaidDfs.validateFile(
+ fileSys, recoveredFile1, file1Stat.getLen(), crc1));
+ } finally {
+ myTearDown();
+ }
+ }
+
+ void corruptBlock(String blockName) throws IOException {
+ boolean corrupted = false;
+ for (int i = 0; i < NUM_DATANODES; i++) {
+ corrupted |= TestDatanodeBlockScanner.corruptReplica(blockName, i);
+ }
+ assertTrue(corrupted);
+ }
+
+ private void mySetup() throws Exception {
+
+ new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+ conf = new Configuration();
+
+ // make all deletions not go through Trash
+ conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
+
+ conf.setBoolean("dfs.permissions", false);
+
+ dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
+ dfs.waitActive();
+ fileSys = dfs.getFileSystem();
+ String namenode = fileSys.getUri().toString();
+ FileSystem.setDefaultUri(conf, namenode);
+ }
+
+ private void myTearDown() throws Exception {
+ if (dfs != null) { dfs.shutdown(); }
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonEncoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonEncoder.java?rev=1034221&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonEncoder.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonEncoder.java Fri Nov 12 00:17:22 2010
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.File;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
+import org.apache.hadoop.hdfs.TestRaidDfs;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.raid.RaidNode;
+
+
+public class TestReedSolomonEncoder extends TestCase {
+ final static Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.TestReedSolomonEncoder");
+ final static String TEST_DIR = new File(System.getProperty("test.build.data",
+ "build/contrib/raid/test/data")).getAbsolutePath();
+ final static int NUM_DATANODES = 3;
+
+ Configuration conf;
+ String namenode = null;
+ MiniDFSCluster dfs = null;
+ FileSystem fileSys = null;
+
+ public void testEncoder() throws Exception {
+ mySetup();
+ int stripeSize = 10;
+ int paritySize = 4;
+ long blockSize = 8192;
+ Path file1 = new Path("/user/raidtest/file1");
+ Path parityFile1 = new Path("/rsraid/user/raidtest/file1");
+ long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
+ 1, 25, blockSize);
+ try {
+ ReedSolomonEncoder encoder = new ReedSolomonEncoder(
+ conf, stripeSize, paritySize);
+ short parityRepl = 1;
+ encoder.encodeFile(fileSys, file1, fileSys, parityFile1, parityRepl,
+ Reporter.NULL);
+
+ FileStatus parityStat = fileSys.getFileStatus(parityFile1);
+ assertEquals(4*8192*3, parityStat.getLen());
+
+ } finally {
+ myTearDown();
+ }
+ }
+
+ private void mySetup() throws Exception {
+
+ new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+ conf = new Configuration();
+
+ // make all deletions not go through Trash
+ conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
+
+ dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
+ dfs.waitActive();
+ fileSys = dfs.getFileSystem();
+ namenode = fileSys.getUri().toString();
+
+ FileSystem.setDefaultUri(conf, namenode);
+
+ }
+
+ private void myTearDown() throws Exception {
+ if (dfs != null) { dfs.shutdown(); }
+ }
+}