You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sc...@apache.org on 2010/11/12 01:17:23 UTC

svn commit: r1034221 [2/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/raid/src/java/org/apache/hadoop/hdfs/ src/contrib/raid/src/java/org/apache/hadoop/raid/ src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/ src/contrib/raid/src/test/org/apa...

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java Fri Nov 12 00:17:22 2010
@@ -48,6 +48,7 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -56,6 +57,7 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.RaidDFSUtil;
 import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
 import org.apache.hadoop.hdfs.TestRaidDfs;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.raid.RaidNode;
 import org.apache.hadoop.raid.RaidUtils;
 
@@ -520,7 +522,7 @@ public class TestBlockFixer extends Test
     assertTrue(corrupted);
   }
 
-  void reportCorruptBlocks(FileSystem fs, Path file, int[] idxs,
+  static void reportCorruptBlocks(FileSystem fs, Path file, int[] idxs,
     long blockSize) throws IOException {
 
     FSDataInputStream in = fs.open(file);

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java Fri Nov 12 00:17:22 2010
@@ -32,6 +32,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.Reporter;
+
 import org.apache.hadoop.raid.protocol.PolicyInfo;
 
 public class TestDirectoryTraversal extends TestCase {
@@ -89,6 +91,7 @@ public class TestDirectoryTraversal exte
   }
 
   public void testSuspension() throws IOException {
+    LOG.info("Starting testSuspension");
     mySetup();
 
     try {
@@ -107,7 +110,7 @@ public class TestDirectoryTraversal exte
       Path raid = new Path("/raid");
       DirectoryTraversal.FileFilter filter =
         new RaidFilter.TimeBasedFilter(conf,
-          RaidNode.getDestinationPath(conf), 1, System.currentTimeMillis(), 0);
+          RaidNode.xorDestinationPath(conf), 1, System.currentTimeMillis(), 0);
       List<FileStatus> selected = dt.getFilteredFiles(filter, limit);
       for (FileStatus f: selected) {
         LOG.info(f.getPath());
@@ -124,6 +127,50 @@ public class TestDirectoryTraversal exte
     }
   }
 
+  public void testFileFilter() throws IOException {
+    mySetup();
+
+    try {
+      Path topDir = new Path(TEST_DIR + "/testFileFilter");
+      int targetRepl = 1;
+      createTestTree(topDir);
+      Path file = new Path(topDir.toString() + "/a/f1");
+      FileStatus stat = fs.getFileStatus(file);
+      PolicyInfo info = new PolicyInfo("testFileFilter", conf);
+      info.setSrcPath(topDir.toString());
+      info.setErasureCode("rs");
+      info.setDescription("test policy");
+      info.setProperty("targetReplication", "1");
+      info.setProperty("metaReplication", "1");
+
+      DirectoryTraversal.FileFilter timeBasedXORFilter =
+        new RaidFilter.TimeBasedFilter(conf,
+          RaidNode.xorDestinationPath(conf), targetRepl,
+            System.currentTimeMillis(), 0);
+      DirectoryTraversal.FileFilter timeBasedRSFilter =
+        new RaidFilter.TimeBasedFilter(conf,
+          RaidNode.rsDestinationPath(conf), targetRepl,
+            System.currentTimeMillis(), 0);
+      DirectoryTraversal.FileFilter preferenceForRSFilter =
+        new RaidFilter.PreferenceFilter(
+          conf, RaidNode.rsDestinationPath(conf),
+          RaidNode.xorDestinationPath(conf), 1, System.currentTimeMillis(), 0);
+
+      assertTrue(timeBasedXORFilter.check(stat));
+      assertTrue(timeBasedRSFilter.check(stat));
+      assertTrue(preferenceForRSFilter.check(stat));
+
+      RaidNode.doRaid(
+        conf, info, stat, new RaidNode.Statistics(), Reporter.NULL);
+
+      assertTrue(timeBasedXORFilter.check(stat));
+      assertFalse(timeBasedRSFilter.check(stat));
+      assertFalse(preferenceForRSFilter.check(stat));
+    } finally {
+      myTearDown();
+    }
+  }
+
   /**
    * Creates a test directory tree.
    *            top

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java Fri Nov 12 00:17:22 2010
@@ -65,6 +65,7 @@ public class TestRaidFilter extends Test
 
     PolicyInfo info1 = new PolicyInfo("p1", conf);
     info1.setSrcPath(src1.toString());
+    info1.setErasureCode("xor");
     info1.setDescription("test policy");
     info1.setProperty("targetReplication", "1");
     info1.setProperty("metaReplication", "1");
@@ -72,6 +73,7 @@ public class TestRaidFilter extends Test
 
     PolicyInfo info2 = new PolicyInfo("p2", conf);
     info2.setSrcPath(src2.toString());
+    info2.setErasureCode("xor");
     info2.setDescription("test policy");
     info2.setProperty("targetReplication", "1");
     info2.setProperty("metaReplication", "1");
@@ -103,7 +105,7 @@ public class TestRaidFilter extends Test
 
       RaidFilter.Statistics stats = new RaidFilter.Statistics();
       RaidFilter.TimeBasedFilter filter = new RaidFilter.TimeBasedFilter(
-        conf, RaidNode.getDestinationPath(conf), info1, all,
+        conf, RaidNode.xorDestinationPath(conf), info1, all,
         System.currentTimeMillis(), stats);
       System.out.println("Stats " + stats);
 

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java Fri Nov 12 00:17:22 2010
@@ -113,7 +113,7 @@ public class TestRaidHar extends TestCas
     String str = "<configuration> " +
                    "<srcPath prefix=\"/user/test/raidtest\"> " +
                      "<policy name = \"RaidTest1\"> " +
-                        "<destPath> /destraid</destPath> " +
+                        "<erasureCode>xor</erasureCode> " +
                         "<property> " +
                           "<name>targetReplication</name> " +
                           "<value>" + targetReplication + "</value> " +
@@ -247,6 +247,9 @@ public class TestRaidHar extends TestCas
             }
           }
           if (count == 1  && listPaths.length == 1) {
+            Path partfile = new Path(harPath, "part-0");
+            assertEquals(fileSys.getFileStatus(partfile).getReplication(),
+              targetReplication);
             break;
           }
         } catch (FileNotFoundException e) {

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java Fri Nov 12 00:17:22 2010
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapred.MiniMRCl
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.raid.protocol.PolicyInfo;
 import org.apache.hadoop.raid.protocol.PolicyList;
+import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
 
 /**
   * Test the generation of parity blocks for files with different block
@@ -122,12 +123,22 @@ public class TestRaidNode extends TestCa
       policies = new java.util.ArrayList<String>();
     }
 
+    public void addPolicy(String name, String path, String parent) {
+      String str =
+        "<srcPath prefix=\"" + path + "\"> " +
+          "<policy name = \"" + name + "\"> " +
+             "<parentPolicy>" + parent + "</parentPolicy>" +
+          "</policy>" +
+        "</srcPath>";
+      policies.add(str);
+    }
+
     public void addPolicy(String name, short srcReplication,
                           long targetReplication, long metaReplication, long stripeLength) {
       String str =
         "<srcPath prefix=\"/user/dhruba/raidtest\"> " +
           "<policy name = \"" + name + "\"> " +
-             "<destPath> /destraid</destPath> " +
+             "<erasureCode>xor</erasureCode> " +
              "<property> " +
                "<name>srcReplication</name> " +
                "<value>" + srcReplication + "</value> " +
@@ -169,7 +180,7 @@ public class TestRaidNode extends TestCa
       String str =
         "<srcPath prefix=\"" + path + "\"> " +
           "<policy name = \"" + name + "\"> " +
-             "<destPath> /destraid</destPath> " +
+             "<erasureCode>xor</erasureCode> " +
              "<property> " +
                "<name>srcReplication</name> " +
                "<value>" + srcReplication + "</value> " +
@@ -280,7 +291,6 @@ public class TestRaidNode extends TestCa
 
       // create an instance of the RaidNode
       cnode = RaidNode.createRaidNode(null, conf);
-      int times = 10;
 
       FileStatus[] listPaths = null;
 
@@ -345,7 +355,7 @@ public class TestRaidNode extends TestCa
                                           StringUtils.stringifyException(e));
       throw e;
     } finally {
-      shell.close();
+      if (shell != null) shell.close();
       if (cnode != null) { cnode.stop(); cnode.join(); }
       LOG.info("doTestPathFilter delete file " + file1);
       fileSys.delete(file1, true);
@@ -388,8 +398,14 @@ public class TestRaidNode extends TestCa
       long firstmodtime = 0;
       // wait till file is raided
       while (true) {
-        Thread.sleep(20000L);                  // waiting
-        listPaths = fileSys.listStatus(destPath);
+        Thread.sleep(1000);                  // waiting
+        try {
+          listPaths = fileSys.listStatus(destPath);
+        } catch (FileNotFoundException e) {
+          LOG.warn("File not found " + destPath);
+          // The directory have been deleted by the purge thread.
+          continue;
+        }
         int count = 0;
         if (listPaths != null && listPaths.length == 1) {
           for (FileStatus s : listPaths) {
@@ -554,7 +570,7 @@ public class TestRaidNode extends TestCa
                      long crc, long corruptOffset) throws IOException {
     // recover the file assuming that we encountered a corruption at offset 0
     String[] args = new String[3];
-    args[0] = "recover";
+    args[0] = "-recover";
     args[1] = file1.toString();
     args[2] = Long.toString(corruptOffset);
     Path recover1 = shell.recover(args[0], args, 1)[0];

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java Fri Nov 12 00:17:22 2010
@@ -47,9 +47,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.hdfs.TestRaidDfs;
 import org.apache.hadoop.raid.protocol.PolicyInfo;
 import org.apache.hadoop.raid.protocol.PolicyList;
+import org.apache.hadoop.hdfs.TestRaidDfs;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.raid.protocol.PolicyInfo;
 
 /**
  * If a file gets deleted, then verify that the parity file gets deleted too.
@@ -61,6 +63,7 @@ public class TestRaidPurge extends TestC
       "test-raid.xml").getAbsolutePath();
   final static long RELOAD_INTERVAL = 1000;
   final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidNode");
+  final Random rand = new Random();
 
   {
     ((Log4JLogger)RaidNode.LOG).getLogger().setLevel(Level.ALL);
@@ -69,8 +72,11 @@ public class TestRaidPurge extends TestC
 
   Configuration conf;
   String namenode = null;
+  String hftp = null;
   MiniDFSCluster dfs = null;
+  MiniMRCluster mr = null;
   FileSystem fileSys = null;
+  String jobTrackerName = null;
 
   /**
    * create mapreduce and dfs clusters
@@ -97,7 +103,7 @@ public class TestRaidPurge extends TestC
     }
 
     conf.set("raid.server.address", "localhost:0");
-    
+
     // create a dfs and map-reduce cluster
     final int taskTrackers = 4;
     final int jobTrackerPort = 60050;
@@ -106,25 +112,31 @@ public class TestRaidPurge extends TestC
     dfs.waitActive();
     fileSys = dfs.getFileSystem();
     namenode = fileSys.getUri().toString();
+    mr = new MiniMRCluster(taskTrackers, namenode, 3);
+    jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+    hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
+
+    FileSystem.setDefaultUri(conf, namenode);
+    conf.set("mapred.job.tracker", jobTrackerName);
   }
     
   /**
    * create raid.xml file for RaidNode
    */
-  private void mySetup(String srcPath, long targetReplication,
-                long metaReplication, long stripeLength)
-    throws Exception {
-    mySetup(srcPath, targetReplication, metaReplication, stripeLength, 1);
+  private void mySetup(long targetReplication,
+    long metaReplication, long stripeLength) throws Exception {
+    int harDelay = 1; // 1 day.
+    mySetup(targetReplication, metaReplication, stripeLength, harDelay);
   }
 
-  private void mySetup(String srcPath, long targetReplication,
-                long metaReplication, long stripeLength, int harDelay)
-    throws Exception {
+  private void mySetup(long targetReplication,
+    long metaReplication, long stripeLength, int harDelay) throws Exception {
     FileWriter fileWriter = new FileWriter(CONFIG_FILE);
     fileWriter.write("<?xml version=\"1.0\"?>\n");
     String str = "<configuration> " +
-                   "<srcPath prefix=\"" + srcPath + "\"> " +
+                   "<srcPath prefix=\"/user/dhruba/raidtest\"> " +
                      "<policy name = \"RaidTest1\"> " +
+                        "<erasureCode>xor</erasureCode> " +
                         "<destPath> /destraid</destPath> " +
                         "<property> " +
                           "<name>targetReplication</name> " +
@@ -154,7 +166,7 @@ public class TestRaidPurge extends TestC
                         "<property> " +
                           "<name>time_before_har</name> " +
                           "<value> " + harDelay + "</value> " +
-                          "<description> time before har'ing parity files" +
+                          "<description> amount of time waited before har'ing parity files" +
                           "</description> " +
                         "</property> " +
                      "</policy>" +
@@ -168,6 +180,7 @@ public class TestRaidPurge extends TestC
    * stop clusters created earlier
    */
   private void stopClusters() throws Exception {
+    if (mr != null) { mr.shutdown(); }
     if (dfs != null) { dfs.shutdown(); }
   }
 
@@ -178,7 +191,6 @@ public class TestRaidPurge extends TestC
   public void testPurge() throws Exception {
     LOG.info("Test testPurge  started.");
 
-    String srcPaths    []  = { "/user/dhruba/raidtest", "/user/dhruba/raid*" };
     long blockSizes    []  = {1024L};
     long stripeLengths []  = {5};
     long targetReplication = 1;
@@ -188,13 +200,11 @@ public class TestRaidPurge extends TestC
 
     createClusters(true);
     try {
-      for (String srcPath : srcPaths ) {
-        for (long blockSize : blockSizes) {
-          for (long stripeLength : stripeLengths) {
-            doTestPurge(iter, srcPath, targetReplication, metaReplication,
-                stripeLength, blockSize, numBlock);
-            iter++;
-          }
+      for (long blockSize : blockSizes) {
+        for (long stripeLength : stripeLengths) {
+           doTestPurge(iter, targetReplication, metaReplication,
+                       stripeLength, blockSize, numBlock);
+           iter++;
         }
       }
     } finally {
@@ -207,12 +217,12 @@ public class TestRaidPurge extends TestC
    * Create parity file, delete original file and then validate that
    * parity file is automatically deleted.
    */
-  private void doTestPurge(int iter, String srcPath, long targetReplication,
+  private void doTestPurge(int iter, long targetReplication,
                           long metaReplication, long stripeLength,
                           long blockSize, int numBlock) throws Exception {
     LOG.info("doTestPurge started---------------------------:" +  " iter " + iter +
              " blockSize=" + blockSize + " stripeLength=" + stripeLength);
-    mySetup(srcPath, targetReplication, metaReplication, stripeLength);
+    mySetup(targetReplication, metaReplication, stripeLength);
     Path dir = new Path("/user/dhruba/raidtest/");
     Path file1 = new Path(dir + "/file" + iter);
     RaidNode cnode = null;
@@ -287,7 +297,7 @@ public class TestRaidPurge extends TestC
     LOG.info("testPurgeHar started");
     int harDelay = 0;
     createClusters(true);
-    mySetup("/user/dhruba/raidtest", 1, 1, 5, harDelay);
+    mySetup(1, 1, 5, harDelay);
     Path dir = new Path("/user/dhruba/raidtest/");
     Path destPath = new Path("/destraid/user/dhruba/raidtest");
     Path file1 = new Path(dir + "/file");
@@ -324,7 +334,14 @@ public class TestRaidPurge extends TestC
       boolean found = false;
       FileStatus[] listPaths = null;
       while (!found || listPaths == null || listPaths.length > 1) {
-        listPaths = fileSys.listStatus(destPath);
+        try {
+          listPaths = fileSys.listStatus(destPath);
+        } catch (FileNotFoundException e) {
+          // If the parent directory is deleted because the har is deleted
+          // and the parent is empty, try again.
+          Thread.sleep(1000);
+          continue;
+        }
         if (listPaths != null) {
           for (FileStatus s: listPaths) {
             LOG.info("testPurgeHar waiting for parity file to be recreated" +
@@ -348,4 +365,153 @@ public class TestRaidPurge extends TestC
       stopClusters();
     }
   }
+
+  /**
+   * Create parity file, delete original file's directory and then validate that
+   * parity directory is automatically deleted.
+   */
+  public void testPurgeDirectory() throws Exception {
+    long stripeLength = 5;
+    long blockSize = 8192;
+    long targetReplication = 1;
+    long metaReplication   = 1;
+    int  numBlock          = 9;
+
+    createClusters(true);
+    mySetup(targetReplication, metaReplication, stripeLength);
+    Path dir = new Path("/user/dhruba/raidtest/");
+    Path file1 = new Path(dir + "/file1");
+    RaidNode cnode = null;
+    try {
+      TestRaidNode.createOldFile(fileSys, file1, 1, numBlock, blockSize);
+
+      // create an instance of the RaidNode
+      Configuration localConf = new Configuration(conf);
+      localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+      cnode = RaidNode.createRaidNode(null, localConf);
+
+      Path destPath = new Path("/destraid/user/dhruba/raidtest");
+      TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
+
+      // delete original directory.
+      assertTrue("Unable to delete original directory " + file1 ,
+                 fileSys.delete(file1.getParent(), true));
+      LOG.info("deleted file " + file1);
+
+      // wait till parity file and directory are automatically deleted
+      long start = System.currentTimeMillis();
+      while (fileSys.exists(destPath) &&
+            System.currentTimeMillis() - start < 120000) {
+        LOG.info("testPurgeDirectory waiting for parity files to be removed.");
+        Thread.sleep(1000);                  // keep waiting
+      }
+      assertFalse(fileSys.exists(destPath));
+
+    } catch (Exception e) {
+      LOG.info("testPurgeDirectory Exception " + e +
+                                          StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      if (cnode != null) { cnode.stop(); cnode.join(); }
+      LOG.info("testPurgeDirectory delete file " + file1);
+      fileSys.delete(file1, true);
+      stopClusters();
+    }
+  }
+
+  /**
+   * Test that an XOR parity file is removed when a RS parity file is detected.
+   */
+  public void testPurgePreference() throws Exception {
+    createClusters(true);
+    Path dir = new Path("/user/test/raidtest/");
+    Path file1 = new Path(dir + "/file1");
+
+    PolicyInfo infoXor = new PolicyInfo("testPurgePreference", conf);
+    infoXor.setSrcPath("/user/test/raidtest");
+    infoXor.setErasureCode("xor");
+    infoXor.setDescription("test policy");
+    infoXor.setProperty("targetReplication", "2");
+    infoXor.setProperty("metaReplication", "2");
+
+    PolicyInfo infoRs = new PolicyInfo("testPurgePreference", conf);
+    infoRs.setSrcPath("/user/test/raidtest");
+    infoRs.setErasureCode("rs");
+    infoRs.setDescription("test policy");
+    infoRs.setProperty("targetReplication", "1");
+    infoRs.setProperty("metaReplication", "1");
+    try {
+      TestRaidNode.createOldFile(fileSys, file1, 1, 9, 8192L);
+      FileStatus stat = fileSys.getFileStatus(file1);
+
+      // Create the parity files.
+      RaidNode.doRaid(
+        conf, infoXor, stat, new RaidNode.Statistics(), Reporter.NULL);
+      RaidNode.doRaid(
+        conf, infoRs, stat, new RaidNode.Statistics(), Reporter.NULL);
+      Path xorParity =
+        new Path(RaidNode.DEFAULT_RAID_LOCATION, "user/test/raidtest/file1");
+      Path rsParity =
+        new Path(RaidNode.DEFAULT_RAIDRS_LOCATION, "user/test/raidtest/file1");
+      assertTrue(fileSys.exists(xorParity));
+      assertTrue(fileSys.exists(rsParity));
+
+      // Check purge of a single parity file.
+      RaidNode cnode = RaidNode.createRaidNode(conf);
+      FileStatus raidRsStat =
+        fileSys.getFileStatus(new Path(RaidNode.DEFAULT_RAIDRS_LOCATION));
+      cnode.purgeMonitor.recursePurge(infoRs.getErasureCode(), fileSys, fileSys,
+         RaidNode.DEFAULT_RAIDRS_LOCATION, raidRsStat);
+
+      // Calling purge under the RS path has no effect.
+      assertTrue(fileSys.exists(xorParity));
+      assertTrue(fileSys.exists(rsParity));
+
+      FileStatus raidStat =
+         fileSys.getFileStatus(new Path(RaidNode.DEFAULT_RAID_LOCATION));
+      cnode.purgeMonitor.recursePurge(infoXor.getErasureCode(), fileSys, fileSys,
+         RaidNode.DEFAULT_RAID_LOCATION, raidStat);
+      // XOR parity must have been purged by now.
+      assertFalse(fileSys.exists(xorParity));
+      assertTrue(fileSys.exists(rsParity));
+
+      // Now check the purge of a parity har.
+      // Delete the RS parity for now.
+      fileSys.delete(rsParity);
+      // Recreate the XOR parity.
+      Path xorHar =
+        new Path(RaidNode.DEFAULT_RAID_LOCATION, "user/test/raidtest/raidtest" +
+          RaidNode.HAR_SUFFIX);
+      RaidNode.doRaid(
+        conf, infoXor, stat, new RaidNode.Statistics(), Reporter.NULL);
+      assertTrue(fileSys.exists(xorParity));
+      assertFalse(fileSys.exists(xorHar));
+
+      // Create the har.
+      long cutoff = System.currentTimeMillis();
+      cnode.recurseHar(infoXor, fileSys, raidStat,
+        RaidNode.DEFAULT_RAID_LOCATION, fileSys, cutoff,
+        RaidNode.tmpHarPathForCode(conf, infoXor.getErasureCode()));
+
+      // Call purge to get rid of the parity file. The har should remain.
+      cnode.purgeMonitor.recursePurge(infoXor.getErasureCode(), fileSys, fileSys,
+         RaidNode.DEFAULT_RAID_LOCATION, raidStat);
+      // XOR har should exist but xor parity file should have been purged.
+      assertFalse(fileSys.exists(xorParity));
+      assertTrue(fileSys.exists(xorHar));
+
+      // Now create the RS parity.
+      RaidNode.doRaid(
+        conf, infoRs, stat, new RaidNode.Statistics(), Reporter.NULL);
+      cnode.purgeMonitor.recursePurge(infoXor.getErasureCode(), fileSys, fileSys,
+         RaidNode.DEFAULT_RAID_LOCATION, raidStat);
+      // XOR har should get deleted.
+      assertTrue(fileSys.exists(rsParity));
+      assertFalse(fileSys.exists(xorParity));
+      assertFalse(fileSys.exists(xorHar));
+
+    } finally {
+      stopClusters();
+    }
+  }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java Fri Nov 12 00:17:22 2010
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
 import org.apache.hadoop.hdfs.TestRaidDfs;
 import org.apache.hadoop.hdfs.RaidDFSUtil;
+import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
 import org.apache.hadoop.raid.RaidNode;
 
 
@@ -120,16 +121,10 @@ public class TestRaidShell extends TestC
       int[] corruptBlockIdxs = new int[]{0, 4, 6};
       for (int idx: corruptBlockIdxs) {
         LOG.info("Corrupting block " + locations.get(idx).getBlock());
-        TestRaidDfs.corruptBlock(file1, locations.get(idx).getBlock(),
-                                  NUM_DATANODES, false); // corrupt block
-        long offset = idx * blockSize;
-        try {
-          readAndDiscard(fileSys, file1, offset, blockSize);
-          fail("Expected checksumexception not thrown");
-        } catch (ChecksumException e) {
-          LOG.info("Block at offset " + offset + " got expected exception");
-        }
+        corruptBlock(locations.get(idx).getBlock().getBlockName());
       }
+      TestBlockFixer.reportCorruptBlocks(fileSys, file1, corruptBlockIdxs,
+        srcStat.getBlockSize());
 
       String fileUriPath = file1.toUri().getPath();
       waitForCorruptBlocks(corruptBlockIdxs.length, dfs, file1);
@@ -150,14 +145,9 @@ public class TestRaidShell extends TestC
       long parityCrc = getCRC(fileSys, parityFile);
       locations = RaidDFSUtil.getBlockLocations(
         dfs, parityFile.toUri().getPath(), 0, parityStat.getLen());
-      TestRaidDfs.corruptBlock(parityFile, locations.get(0).getBlock(),
-                                NUM_DATANODES, false); // corrupt block
-      try {
-        readAndDiscard(fileSys, parityFile, 0, blockSize);
-        fail("Expected checksumexception not thrown");
-      } catch (ChecksumException e) {
-        LOG.info("Parity Block at offset 0 got expected exception");
-      }
+      corruptBlock(locations.get(0).getBlock().getBlockName());
+      TestBlockFixer.reportCorruptBlocks(fileSys, parityFile, new int[]{0},
+        srcStat.getBlockSize());
       waitForCorruptBlocks(1, dfs, parityFile);
 
       args[1] = parityFile.toUri().getPath();
@@ -294,14 +284,11 @@ public class TestRaidShell extends TestC
     return crc.getValue();
   }
 
-  private static void readAndDiscard(
-    FileSystem fs, Path p, long offset, long length) throws IOException {
-    FSDataInputStream in = fs.open(p);
-    in.seek(offset);
-    long count = 0;
-    for (int b = 0; b >= 0 && count < length; count++) {
-      b = in.read();
+  void corruptBlock(String blockName) throws IOException {
+    boolean corrupted = false;
+    for (int i = 0; i < NUM_DATANODES; i++) {
+      corrupted |= TestDatanodeBlockScanner.corruptReplica(blockName, i);
     }
+    assertTrue(corrupted);
   }
 }
-

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java?rev=1034221&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java Fri Nov 12 00:17:22 2010
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
+import org.apache.hadoop.hdfs.RaidDFSUtil;
+import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
+import org.apache.hadoop.hdfs.TestRaidDfs;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.raid.RaidNode;
+
+
+public class TestReedSolomonDecoder extends TestCase {
+  final static Log LOG = LogFactory.getLog(
+                            "org.apache.hadoop.raid.TestReedSolomonDecoder");
+  final static String TEST_DIR = new File(System.getProperty("test.build.data",
+      "build/contrib/raid/test/data")).getAbsolutePath();
+  final static int NUM_DATANODES = 3;
+
+  Configuration conf;
+  MiniDFSCluster dfs = null;
+  FileSystem fileSys = null;
+
+  public void testDecoder() throws Exception {
+    mySetup();
+    int stripeSize = 10;
+    int paritySize = 4;
+    long blockSize = 8192;
+    Path file1 = new Path("/user/raidtest/file1");
+    Path recoveredFile1 = new Path("/user/raidtest/file1.recovered");
+    Path parityFile1 = new Path("/rsraid/user/raidtest/file1");
+    long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
+                                                          1, 25, blockSize);
+    FileStatus file1Stat = fileSys.getFileStatus(file1);
+
+    conf.setInt("raid.rsdecoder.bufsize", 512);
+    conf.setInt("raid.rsencoder.bufsize", 512);
+
+    try {
+      // First encode the file.
+      ReedSolomonEncoder encoder = new ReedSolomonEncoder(
+        conf, stripeSize, paritySize);
+      short parityRepl = 1;
+      encoder.encodeFile(fileSys, file1, fileSys, parityFile1, parityRepl,
+        Reporter.NULL);
+
+      // Ensure there are no corrupt files yet.
+      DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+      String[] corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
+      assertEquals(corruptFiles.length, 0);
+
+      // Now corrupt the file.
+      long corruptOffset = blockSize * 5;
+      FileStatus srcStat = fileSys.getFileStatus(file1);
+      LocatedBlocks locations = RaidDFSUtil.getBlockLocations(dfs,
+          file1.toUri().getPath(), 0, srcStat.getLen());
+      corruptBlock(locations.get(5).getBlock().getBlockName());
+      corruptBlock(locations.get(6).getBlock().getBlockName());
+      TestBlockFixer.reportCorruptBlocks(dfs, file1, new int[]{5, 6},
+          srcStat.getBlockSize());
+
+      // Ensure file is corrupted.
+      corruptFiles = RaidDFSUtil.getCorruptFiles(conf);
+      assertEquals(corruptFiles.length, 1);
+      assertEquals(corruptFiles[0], file1.toString());
+
+      // Fix the file.
+      ReedSolomonDecoder decoder = new ReedSolomonDecoder(
+        conf, stripeSize, paritySize);
+      decoder.decodeFile(fileSys, file1, fileSys, parityFile1,
+                corruptOffset, recoveredFile1);
+      assertTrue(TestRaidDfs.validateFile(
+                    fileSys, recoveredFile1, file1Stat.getLen(), crc1));
+    } finally {
+      myTearDown();
+    }
+  }
+
+  void corruptBlock(String blockName) throws IOException {
+    boolean corrupted = false;
+    for (int i = 0; i < NUM_DATANODES; i++) {
+      corrupted |= TestDatanodeBlockScanner.corruptReplica(blockName, i);
+    }
+    assertTrue(corrupted);
+  }
+
+  private void mySetup() throws Exception {
+
+    new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+    conf = new Configuration();
+
+    // make all deletions not go through Trash
+    conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
+
+    conf.setBoolean("dfs.permissions", false);
+
+    dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
+    dfs.waitActive();
+    fileSys = dfs.getFileSystem();
+    String namenode = fileSys.getUri().toString();
+    FileSystem.setDefaultUri(conf, namenode);
+  }
+
+  private void myTearDown() throws Exception {
+    if (dfs != null) { dfs.shutdown(); }
+  }
+}

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonEncoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonEncoder.java?rev=1034221&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonEncoder.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonEncoder.java Fri Nov 12 00:17:22 2010
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.File;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
+import org.apache.hadoop.hdfs.TestRaidDfs;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.raid.RaidNode;
+
+
+public class TestReedSolomonEncoder extends TestCase {
+  final static Log LOG = LogFactory.getLog(
+                            "org.apache.hadoop.raid.TestReedSolomonEncoder");
+  final static String TEST_DIR = new File(System.getProperty("test.build.data",
+      "build/contrib/raid/test/data")).getAbsolutePath();
+  final static int NUM_DATANODES = 3;
+
+  Configuration conf;
+  String namenode = null;
+  MiniDFSCluster dfs = null;
+  FileSystem fileSys = null;
+
+  public void testEncoder() throws Exception {
+    mySetup();
+    int stripeSize = 10;
+    int paritySize = 4;
+    long blockSize = 8192;
+    Path file1 = new Path("/user/raidtest/file1");
+    Path parityFile1 = new Path("/rsraid/user/raidtest/file1");
+    long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
+                                                          1, 25, blockSize);
+    try {
+      ReedSolomonEncoder encoder = new ReedSolomonEncoder(
+        conf, stripeSize, paritySize);
+      short parityRepl = 1;
+      encoder.encodeFile(fileSys, file1, fileSys, parityFile1, parityRepl,
+        Reporter.NULL);
+
+      FileStatus parityStat = fileSys.getFileStatus(parityFile1);
+      assertEquals(4*8192*3, parityStat.getLen());
+
+    } finally {
+      myTearDown();
+    }
+  }
+
+  private void mySetup() throws Exception {
+
+    new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+    conf = new Configuration();
+
+    // make all deletions not go through Trash
+    conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
+
+    dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
+    dfs.waitActive();
+    fileSys = dfs.getFileSystem();
+    namenode = fileSys.getUri().toString();
+
+    FileSystem.setDefaultUri(conf, namenode);
+
+  }
+
+  private void myTearDown() throws Exception {
+    if (dfs != null) { dfs.shutdown(); }
+  }
+}