You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sc...@apache.org on 2010/11/12 01:17:23 UTC

svn commit: r1034221 [1/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/raid/src/java/org/apache/hadoop/hdfs/ src/contrib/raid/src/java/org/apache/hadoop/raid/ src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/ src/contrib/raid/src/test/org/apa...

Author: schen
Date: Fri Nov 12 00:17:22 2010
New Revision: 1034221

URL: http://svn.apache.org/viewvc?rev=1034221&view=rev
Log:
MAPREDUCE-2169. Integrated Reed-Solomon code with RaidNode. (Ramkumar Vadali via schen)

Added:
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonDecoder.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonEncoder.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonEncoder.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonCode.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Nov 12 00:17:22 2010
@@ -21,6 +21,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1970.  Reed-Solomon code implementation for HDFS RAID.
     (Scott Chen via dhruba)
 
+    MAPREDUCE-2169. Integrated Reed-Solomon code with RaidNode. (Ramkumar
+    Vadali via schen)
+
   IMPROVEMENTS
 
     MAPREDUCE-2141. Add an "extra data" field to Task for use by Mesos. (matei)

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java Fri Nov 12 00:17:22 2010
@@ -20,26 +20,33 @@ package org.apache.hadoop.hdfs;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.DataInput;
+import java.io.PrintStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.hdfs.BlockMissingException;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.raid.Decoder;
 import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.ReedSolomonDecoder;
 import org.apache.hadoop.raid.XORDecoder;
-import org.apache.hadoop.hdfs.BlockMissingException;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
 
 /**
  * This is an implementation of the Hadoop  RAID Filesystem. This FileSystem 
@@ -51,7 +58,7 @@ import org.apache.hadoop.hdfs.Distribute
 public class DistributedRaidFileSystem extends FilterFileSystem {
 
   // these are alternate locations that can be used for read-only access
-  Path[]     alternates;
+  DecodeInfo[] alternates;
   Configuration conf;
   int stripeLength;
 
@@ -64,6 +71,30 @@ public class DistributedRaidFileSystem e
     stripeLength = 0;
   }
 
+  // Information required for decoding a source file
+  static private class DecodeInfo {
+    final Path destPath;
+    final ErasureCodeType type;
+    final Configuration conf;
+    final int stripeLength;
+    private DecodeInfo(Configuration conf, ErasureCodeType type, Path destPath) {
+      this.conf = conf;
+      this.type = type;
+      this.destPath = destPath;
+      this.stripeLength = RaidNode.getStripeLength(conf);
+    }
+
+    Decoder createDecoder() {
+      if (this.type == ErasureCodeType.XOR) {
+        return new XORDecoder(conf, stripeLength);
+      } else if (this.type == ErasureCodeType.RS) {
+        return new ReedSolomonDecoder(conf, stripeLength,
+                              RaidNode.rsParityLength(conf));
+      }
+      return null;
+    }
+  }
+
   /* Initialize a Raid FileSystem
    */
   public void initialize(URI name, Configuration conf) throws IOException {
@@ -78,23 +109,6 @@ public class DistributedRaidFileSystem e
     this.fs = (FileSystem)ReflectionUtils.newInstance(clazz, null); 
     super.initialize(name, conf);
     
-    String alt = conf.get("hdfs.raid.locations");
-    
-    // If no alternates are specified, then behave absolutely same as 
-    // the original file system.
-    if (alt == null || alt.length() == 0) {
-      LOG.info("hdfs.raid.locations not defined. Using defaults...");
-      alt = RaidNode.DEFAULT_RAID_LOCATION;
-    }
-
-    // fs.alternate.filesystem.prefix can be of the form:
-    // "hdfs://host:port/myPrefixPath, file:///localPrefix,hftp://host1:port1/"
-    String[] strs  = alt.split(",");
-    if (strs == null || strs.length == 0) {
-      LOG.info("hdfs.raid.locations badly defined. Ignoring...");
-      return;
-    }
-
     // find stripe length configured
     stripeLength = RaidNode.getStripeLength(conf);
     if (stripeLength == 0) {
@@ -103,12 +117,12 @@ public class DistributedRaidFileSystem e
       return;
     }
 
-    // create a reference to all underlying alternate path prefix
-    alternates = new Path[strs.length];
-    for (int i = 0; i < strs.length; i++) {
-      alternates[i] = new Path(strs[i].trim());
-      alternates[i] = alternates[i].makeQualified(fs);
-    }
+    // Put XOR and RS in alternates
+    alternates= new DecodeInfo[2];
+    Path xorPath = RaidNode.xorDestinationPath(conf, fs);
+    alternates[0] = new DecodeInfo(conf, ErasureCodeType.XOR, xorPath);
+    Path rsPath = RaidNode.rsDestinationPath(conf, fs);
+    alternates[1] = new DecodeInfo(conf, ErasureCodeType.RS, rsPath);
   }
 
   /*
@@ -153,13 +167,13 @@ public class DistributedRaidFileSystem e
       private int nextLocation;
       private DistributedRaidFileSystem lfs;
       private Path path;
-      private final Path[] alternates;
+      private final DecodeInfo[] alternates;
       private final int buffersize;
       private final Configuration conf;
       private final int stripeLength;
 
-      ExtFsInputStream(Configuration conf, DistributedRaidFileSystem lfs, Path[] alternates,
-                       Path path, int stripeLength, int buffersize)
+      ExtFsInputStream(Configuration conf, DistributedRaidFileSystem lfs,
+          DecodeInfo[] alternates, Path path, int stripeLength, int buffersize)
           throws IOException {
         this.underLyingStream = lfs.fs.open(path, buffersize);
         this.path = path;
@@ -345,10 +359,13 @@ public class DistributedRaidFileSystem e
             clientConf.set("fs.hdfs.impl", clazz.getName());
             // Disable caching so that a previously cached RaidDfs is not used.
             clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
-            Decoder decoder =
-              new XORDecoder(clientConf, RaidNode.getStripeLength(clientConf));
-            Path npath = RaidNode.unRaid(clientConf, path, alternates[idx],
-                              decoder, stripeLength, corruptOffset);
+            Path npath = RaidNode.unRaid(clientConf, path,
+                         alternates[idx].destPath,
+                         alternates[idx].createDecoder(),
+                         stripeLength, corruptOffset);
+            if (npath == null)
+              continue;
+
             FileSystem fs1 = getUnderlyingFileSystem(conf);
             fs1.initialize(npath.toUri(), conf);
             LOG.info("Opening alternate path " + npath + " at offset " + curpos);
@@ -392,7 +409,7 @@ public class DistributedRaidFileSystem e
      * @throws IOException
      */
     public ExtFSDataInputStream(Configuration conf, DistributedRaidFileSystem lfs,
-      Path[] alternates, Path  p, int stripeLength, int buffersize) throws IOException {
+      DecodeInfo[] alternates, Path  p, int stripeLength, int buffersize) throws IOException {
         super(new ExtFsInputStream(conf, lfs, alternates, p, stripeLength, buffersize));
     }
   }

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java Fri Nov 12 00:17:22 2010
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.raid;
 
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -60,6 +59,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileStatus;
@@ -71,36 +71,73 @@ import org.apache.hadoop.net.NetUtils;
 
 import org.apache.hadoop.raid.RaidNode;
 import org.apache.hadoop.raid.RaidUtils;
+import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
+
 
 /**
- * This class fixes source file blocks using the parity file,
- * and parity file blocks using the source file.
- * It periodically fetches the list of corrupt files from the namenode,
- * and figures out the location of the bad block by reading through
- * the corrupt file.
+ * contains the core functionality of the block fixer
+ *
+ * raid.blockfix.interval          - interval between checks for corrupt files
+ *
+ * raid.blockfix.history.interval  - interval before fixing same file again
+ *
+ * raid.blockfix.read.timeout      - read time out
+ *
+ * raid.blockfix.write.timeout     - write time out
  */
-public class BlockFixer implements Runnable {
+public class BlockFixer extends Configured implements Runnable {
   public static final Log LOG = LogFactory.getLog(
                                   "org.apache.hadoop.raid.BlockFixer");
+
+  public static final String BLOCKFIX_INTERVAL = "raid.blockfix.interval";
+  public static final String BLOCKFIX_HISTORY_INTERVAL =
+    "raid.blockfix.history.interval";
+  public static final String BLOCKFIX_READ_TIMEOUT =
+    "raid.blockfix.read.timeout";
+  public static final String BLOCKFIX_WRITE_TIMEOUT =
+    "raid.blockfix.write.timeout";
+
+  public static final long DEFAULT_BLOCKFIX_INTERVAL = 60 * 1000; // 1 min
+  public static final long DEFAULT_BLOCKFIX_HISTORY_INTERVAL =
+    60 * 60 * 1000; // 60 mins
+
   private java.util.HashMap<String, java.util.Date> history;
-  private int blockFixInterval = 60*1000; // 1min
   private long numFilesFixed = 0;
-  private Configuration conf;
   private String xorPrefix;
-  private XOREncoder xorEncoder;
-  private XORDecoder xorDecoder;
+  private String rsPrefix;
+  private Encoder xorEncoder;
+  private Decoder xorDecoder;
+  private Encoder rsEncoder;
+  private Decoder rsDecoder;
+
+  // interval between checks for corrupt files
+  protected long blockFixInterval = DEFAULT_BLOCKFIX_INTERVAL;
+
+  // interval before fixing same file again
+  protected long historyInterval = DEFAULT_BLOCKFIX_HISTORY_INTERVAL;
+
+  public volatile boolean running = true;
 
-  boolean running = true;
 
   public BlockFixer(Configuration conf) throws IOException {
-    this.conf = conf;
+    super(conf);
     history = new java.util.HashMap<String, java.util.Date>();
-    blockFixInterval = conf.getInt("raid.blockfix.interval",
-                                   blockFixInterval);
-    xorPrefix = RaidNode.getDestinationPath(conf).toUri().getPath();
-    int stripeLength = RaidNode.getStripeLength(conf);
-    xorEncoder = new XOREncoder(conf, stripeLength);
-    xorDecoder = new XORDecoder(conf, stripeLength);
+    blockFixInterval = getConf().getInt(BLOCKFIX_INTERVAL,
+                                   (int) blockFixInterval);
+    xorPrefix = RaidNode.xorDestinationPath(getConf()).toUri().getPath();
+    if (!xorPrefix.endsWith(Path.SEPARATOR)) {
+      xorPrefix += Path.SEPARATOR;
+    }
+    int stripeLength = RaidNode.getStripeLength(getConf());
+    xorEncoder = new XOREncoder(getConf(), stripeLength);
+    xorDecoder = new XORDecoder(getConf(), stripeLength);
+    rsPrefix = RaidNode.rsDestinationPath(getConf()).toUri().getPath();
+    if (!rsPrefix.endsWith(Path.SEPARATOR)) {
+      rsPrefix += Path.SEPARATOR;
+    }
+    int parityLength = RaidNode.rsParityLength(getConf());
+    rsEncoder = new ReedSolomonEncoder(getConf(), stripeLength, parityLength);
+    rsDecoder = new ReedSolomonDecoder(getConf(), stripeLength, parityLength);
   }
 
   public void run() {
@@ -154,6 +191,7 @@ public class BlockFixer implements Runna
 
 
   void fixFile(Path srcPath) throws IOException {
+
     if (RaidNode.isParityHarPartFile(srcPath)) {
       processCorruptParityHarPartFile(srcPath);
       return;
@@ -165,24 +203,30 @@ public class BlockFixer implements Runna
       return;
     }
 
-    // The corrupted file is a source file
+    // The corrupted file is a ReedSolomon parity file
+    if (isRsParityFile(srcPath)) {
+      processCorruptParityFile(srcPath, rsEncoder);
+      return;
+    }
 
-    // Do we have a parity file for this file?
-    RaidNode.ParityFilePair ppair = null;
+    // The corrupted file is a source file
+    RaidNode.ParityFilePair ppair =
+      RaidNode.xorParityForSource(srcPath, getConf());
     Decoder decoder = null;
-    Path destPath = null;
-    try {
-      destPath = RaidNode.getDestinationPath(conf);
-      ppair = RaidNode.getParityFile(destPath, srcPath, conf);
+    if (ppair != null) {
+      decoder = xorDecoder;
+    } else  {
+      ppair = RaidNode.rsParityForSource(srcPath, getConf());
       if (ppair != null) {
-        decoder = xorDecoder;
+        decoder = rsDecoder;
       }
-    } catch (FileNotFoundException e) {
     }
+
     // If we have a parity file, process the file and fix it.
     if (ppair != null) {
-      processCorruptFile(srcPath, destPath, decoder);
+      processCorruptFile(srcPath, ppair, decoder);
     }
+
   }
 
   /**
@@ -193,8 +237,8 @@ public class BlockFixer implements Runna
    */
   void purgeHistory() {
     // Default history interval is 1 hour.
-    long historyInterval = conf.getLong(
-                             "raid.blockfix.history.interval", 3600*1000);
+    long historyInterval = getConf().getLong(
+                             BLOCKFIX_HISTORY_INTERVAL, 3600*1000);
     java.util.Date cutOff = new java.util.Date(
                                    System.currentTimeMillis()-historyInterval);
     List<String> toRemove = new java.util.ArrayList<String>();
@@ -217,14 +261,14 @@ public class BlockFixer implements Runna
   List<Path> getCorruptFiles() throws IOException {
     DistributedFileSystem dfs = getDFS(new Path("/"));
 
-    String[] nnCorruptFiles = RaidDFSUtil.getCorruptFiles(conf);
+    String[] nnCorruptFiles = RaidDFSUtil.getCorruptFiles(getConf());
     List<Path> corruptFiles = new LinkedList<Path>();
     for (String file: nnCorruptFiles) {
       if (!history.containsKey(file)) {
         corruptFiles.add(new Path(file));
       }
     }
-    RaidUtils.filterTrash(conf, corruptFiles);
+    RaidUtils.filterTrash(getConf(), corruptFiles);
     return corruptFiles;
   }
 
@@ -235,11 +279,11 @@ public class BlockFixer implements Runna
     // TODO: We should first fix the files that lose more blocks
     Comparator<Path> comp = new Comparator<Path>() {
       public int compare(Path p1, Path p2) {
-        if (isXorParityFile(p2)) {
+        if (isXorParityFile(p2) || isRsParityFile(p2)) {
           // If p2 is a parity file, p1 is smaller.
           return -1;
         }
-        if (isXorParityFile(p1)) {
+        if (isXorParityFile(p1) || isRsParityFile(p1)) {
           // If p1 is a parity file, p2 is smaller.
           return 1;
         }
@@ -250,21 +294,13 @@ public class BlockFixer implements Runna
     Collections.sort(files, comp);
   }
 
-
-  /**
-   * Returns a DistributedFileSystem hosting the path supplied.
-   */
-  private DistributedFileSystem getDFS(Path p) throws IOException {
-    return (DistributedFileSystem) p.getFileSystem(conf);
-  }
-
   /**
    * Reads through a corrupt source file fixing corrupt blocks on the way.
    * @param srcPath Path identifying the corrupt file.
    * @throws IOException
    */
-  void processCorruptFile(Path srcPath, Path destPath, Decoder decoder)
-      throws IOException {
+  void processCorruptFile(Path srcPath, RaidNode.ParityFilePair parityPair,
+      Decoder decoder) throws IOException {
     LOG.info("Processing corrupt file " + srcPath);
 
     DistributedFileSystem srcFs = getDFS(srcPath);
@@ -290,9 +326,6 @@ public class BlockFixer implements Runna
       localBlockFile.deleteOnExit();
 
       try {
-        RaidNode.ParityFilePair parityPair = RaidNode.getParityFile(
-            destPath, srcPath, conf);
-
         decoder.recoverBlockToFile(srcFs, srcPath, parityPair.getFileSystem(),
           parityPair.getPath(), blockSize, corruptOffset, localBlockFile,
           blockContentsSize);
@@ -313,6 +346,9 @@ public class BlockFixer implements Runna
     numFilesFixed++;
   }
 
+  /**
+   * checks whether file is xor parity file
+   */
   boolean isXorParityFile(Path p) {
     String pathStr = p.toUri().getPath();
     if (pathStr.contains(RaidNode.HAR_SUFFIX)) {
@@ -322,7 +358,25 @@ public class BlockFixer implements Runna
   }
 
   /**
-   * Reads through a parity file, fixing corrupt blocks on the way.
+   * checks whether file is rs parity file
+   */
+  boolean isRsParityFile(Path p) {
+    String pathStr = p.toUri().getPath();
+    if (pathStr.contains(RaidNode.HAR_SUFFIX)) {
+      return false;
+    }
+    return pathStr.startsWith(rsPrefix);
+  }
+
+  /**
+   * Returns a DistributedFileSystem hosting the path supplied.
+   */
+  protected DistributedFileSystem getDFS(Path p) throws IOException {
+    return (DistributedFileSystem) p.getFileSystem(getConf());
+  }
+
+  /**
+   * Fixes corrupt blocks in a parity file.
    * This function uses the corresponding source file to regenerate parity
    * file blocks.
    */
@@ -469,6 +523,8 @@ public class BlockFixer implements Runna
         Encoder encoder;
         if (isXorParityFile(parityFile)) {
           encoder = xorEncoder;
+        } else if (isRsParityFile(parityFile)) {
+          encoder = rsEncoder;
         } else {
           String msg = "Could not figure out parity file correctly";
           LOG.warn(msg);
@@ -605,7 +661,7 @@ public class BlockFixer implements Runna
     DataInputStream blockMetadata = null;
     try {
       blockContents = new FileInputStream(localBlockFile);
-      blockMetadata = computeMetadata(conf, blockContents);
+      blockMetadata = computeMetadata(getConf(), blockContents);
       blockContents.close();
       // Reopen
       blockContents = new FileInputStream(localBlockFile);
@@ -638,13 +694,13 @@ public class BlockFixer implements Runna
     InetSocketAddress target = NetUtils.createSocketAddr(datanode.name);
     Socket sock = SocketChannel.open().socket();
 
-    int readTimeout =  conf.getInt(
-        "raid.blockfix.read.timeout", HdfsConstants.READ_TIMEOUT);
+    int readTimeout = getConf().getInt(BLOCKFIX_READ_TIMEOUT,
+      HdfsConstants.READ_TIMEOUT);
     NetUtils.connect(sock, target, readTimeout);
     sock.setSoTimeout(readTimeout);
 
-    int writeTimeout = conf.getInt(
-        "raid.blockfix.write.timeout", HdfsConstants.WRITE_TIMEOUT);
+    int writeTimeout = getConf().getInt(BLOCKFIX_WRITE_TIMEOUT,
+      HdfsConstants.WRITE_TIMEOUT);
 
     OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
     DataOutputStream out = new DataOutputStream(
@@ -685,11 +741,18 @@ public class BlockFixer implements Runna
     }
   }
 
+  /**
+   * returns the source file corresponding to a parity file
+   */
   Path sourcePathFromParityPath(Path parityPath) {
     String parityPathStr = parityPath.toUri().getPath();
     if (parityPathStr.startsWith(xorPrefix)) {
       // Remove the prefix to get the source file.
-      String src = parityPathStr.replaceFirst(xorPrefix, "");
+      String src = parityPathStr.replaceFirst(xorPrefix, "/");
+      return new Path(src);
+    } else if (parityPathStr.startsWith(rsPrefix)) {
+      // Remove the prefix to get the source file.
+      String src = parityPathStr.replaceFirst(rsPrefix, "/");
       return new Path(src);
     }
     return null;

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java Fri Nov 12 00:17:22 2010
@@ -22,7 +22,9 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
@@ -146,8 +148,9 @@ class ConfigManager {
    * 
    <configuration>
     <srcPath prefix="hdfs://hadoop.myhost.com:9000/user/warehouse/u_full/*">
-      <destPath> hdfs://dfsarch.data.facebook.com:9000/archive/</destPath>
       <policy name = RaidScanWeekly>
+        <destPath> hdfs://dfsname.myhost.com:9000/archive/</destPath>
+        <parentPolicy> RaidScanMonthly</parentPolicy>
         <property>
           <name>targetReplication</name>
           <value>2</value>
@@ -220,6 +223,8 @@ class ConfigManager {
           "top-level element not <configuration>");
     NodeList elements = root.getChildNodes();
 
+    Map<String, PolicyInfo> existingPolicies =
+      new HashMap<String, PolicyInfo>();
     // loop through all the configured source paths.
     for (int i = 0; i < elements.getLength(); i++) {
       Node node = elements.item(i);
@@ -231,15 +236,15 @@ class ConfigManager {
       if ("srcPath".equalsIgnoreCase(elementTagName)) {
         String srcPathPrefix = element.getAttribute("prefix");
 
-        if (srcPathPrefix == null || srcPathPrefix.length() == 0) {
-          throw new RaidConfigurationException("Bad configuration file: " + 
-             "srcPathPrefix not set.");
+        PolicyList policyList = null;
+        if (srcPathPrefix != null && srcPathPrefix.length() != 0) {
+          // Empty srcPath will have no effect but policies will be processed
+          // This allow us to define some "abstract" policies
+          policyList = new PolicyList();
+          all.add(policyList);
+          policyList.setSrcPath(conf, srcPathPrefix);
         }
-        PolicyList policyList = new PolicyList();
-        all.add(policyList);
 
-        policyList.setSrcPath(conf, srcPathPrefix);
-        
         // loop through all the policies for this source path
         NodeList policies = element.getChildNodes();
         for (int j = 0; j < policies.getLength(); j++) {
@@ -253,12 +258,13 @@ class ConfigManager {
               "Expecting <policy> for srcPath " + srcPathPrefix);
           }
           String policyName = policy.getAttribute("name");
-          PolicyInfo pinfo = new PolicyInfo(policyName, conf);
-          pinfo.setSrcPath(srcPathPrefix);
-          policyList.add(pinfo);
-
+          PolicyInfo curr = new PolicyInfo(policyName, conf);
+          if (srcPathPrefix != null && srcPathPrefix.length() > 0) {
+            curr.setSrcPath(srcPathPrefix);
+          }
           // loop through all the properties of this policy
           NodeList properties = policy.getChildNodes();
+          PolicyInfo parent = null;
           for (int k = 0; k < properties.getLength(); k++) {
             Node node2 = properties.item(k);
             if (!(node2 instanceof Element)) {
@@ -266,13 +272,16 @@ class ConfigManager {
             }
             Element property = (Element)node2;
             String propertyName = property.getTagName();
-            if ("destPath".equalsIgnoreCase(propertyName)) {
+            if ("erasureCode".equalsIgnoreCase(propertyName)) {
               String text = ((Text)property.getFirstChild()).getData().trim();
-              LOG.info(policyName + ".destPath = " + text);
-              pinfo.setDestinationPath(text);
+              LOG.info(policyName + ".erasureCode = " + text);
+              curr.setErasureCode(text);
             } else if ("description".equalsIgnoreCase(propertyName)) {
               String text = ((Text)property.getFirstChild()).getData().trim();
-              pinfo.setDescription(text);
+              curr.setDescription(text);
+            } else if ("parentPolicy".equalsIgnoreCase(propertyName)) {
+              String text = ((Text)property.getFirstChild()).getData().trim();
+              parent = existingPolicies.get(text);
             } else if ("property".equalsIgnoreCase(propertyName)) {
               NodeList nl = property.getChildNodes();
               String pname=null,pvalue=null;
@@ -291,7 +300,7 @@ class ConfigManager {
               }
               if (pname != null && pvalue != null) {
                 LOG.info(policyName + "." + pname + " = " + pvalue);
-                pinfo.setProperty(pname,pvalue);
+                curr.setProperty(pname,pvalue);
               }
             } else {
               LOG.info("Found bad property " + propertyName +
@@ -300,6 +309,20 @@ class ConfigManager {
                        ". Ignoring."); 
             }
           }  // done with all properties of this policy
+
+          PolicyInfo pinfo;
+          if (parent != null) {
+            pinfo = new PolicyInfo(policyName, conf);
+            pinfo.copyFrom(parent);
+            pinfo.copyFrom(curr);
+          } else {
+            pinfo = curr;
+          }
+          if (policyList != null) {
+            policyList.add(pinfo);
+          }
+          existingPolicies.put(policyName, pinfo);
+
         }    // done with all policies for this srcpath
       } 
     }        // done with all srcPaths

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java Fri Nov 12 00:17:22 2010
@@ -335,9 +335,6 @@ public class DistRaid {
          LOG.info("Job Complete(Failed): " + jobID);
        }
        raidPolicyPathPairList.clear();
-       Counters ctrs = runningJob.getCounters();
-       long filesRaided = ctrs.findCounter(Counter.FILES_SUCCEEDED).getValue();
-       long filesFailed = ctrs.findCounter(Counter.FILES_FAILED).getValue();
        return true;
      } else {
        String report =  (" job " + jobID +

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java Fri Nov 12 00:17:22 2010
@@ -107,7 +107,8 @@ public abstract class Encoder {
    * @param srcFile The source file.
    * @param parityFile The parity file to be generated.
    */
-  public void encodeFile(FileSystem fs, Path srcFile, Path parityFile,
+  public void encodeFile(
+    FileSystem fs, Path srcFile, FileSystem parityFs, Path parityFile,
     short parityRepl, Progressable reporter) throws IOException {
     FileStatus srcStat = fs.getFileStatus(srcFile);
     long srcSize = srcStat.getLen();
@@ -116,10 +117,13 @@ public abstract class Encoder {
     configureBuffers(blockSize);
 
     // Create a tmp file to which we will write first.
-    Path parityTmp = new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") +
-                              parityFile.toUri().getPath() +
-                              "." + rand.nextLong() + ".tmp");
-    FSDataOutputStream out = fs.create(
+    Path tmpDir = getParityTempPath();
+    if (!parityFs.mkdirs(tmpDir)) {
+      throw new IOException("Could not create tmp dir " + tmpDir);
+    }
+    Path parityTmp = new Path(tmpDir,
+                            parityFile.getName() + rand.nextLong());
+    FSDataOutputStream out = parityFs.create(
                                parityTmp,
                                true,
                                conf.getInt("io.file.buffer.size", 64 * 1024),
@@ -133,11 +137,11 @@ public abstract class Encoder {
       LOG.info("Wrote temp parity file " + parityTmp);
 
       // delete destination if exists
-      if (fs.exists(parityFile)){
-        fs.delete(parityFile, false);
+      if (parityFs.exists(parityFile)){
+        parityFs.delete(parityFile, false);
       }
-      fs.mkdirs(parityFile.getParent());
-      if (!fs.rename(parityTmp, parityFile)) {
+      parityFs.mkdirs(parityFile.getParent());
+      if (!parityFs.rename(parityTmp, parityFile)) {
         String msg = "Unable to rename file " + parityTmp + " to " + parityFile;
         throw new IOException (msg);
       }
@@ -146,7 +150,7 @@ public abstract class Encoder {
       if (out != null) {
         out.close();
       }
-      fs.delete(parityTmp, false);
+      parityFs.delete(parityTmp, false);
     }
   }
 
@@ -225,7 +229,7 @@ public abstract class Encoder {
     LOG.info("Starting recovery by using source stripe " +
               srcFile + ":" + stripeStart);
     // Read the data from the blocks and write to the parity file.
-    encodeStripe(blocks, stripeStart, blockSize, outs, 
+    encodeStripe(blocks, stripeStart, blockSize, outs,
                  new RaidUtils.DummyProgressable());
   }
 
@@ -339,4 +343,9 @@ public abstract class Encoder {
     long blockSize,
     OutputStream[] outs,
     Progressable reporter) throws IOException;
+
+  /**
+   * Return the temp path for the parity file
+   */
+  protected abstract Path getParityTempPath();
 }

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java Fri Nov 12 00:17:22 2010
@@ -218,4 +218,42 @@ public class RaidFilter {
       return len;
     }
   }
+
+  static class PreferenceFilter extends Configured
+    implements DirectoryTraversal.FileFilter {
+    Path firstChoicePrefix;
+    DirectoryTraversal.FileFilter secondChoiceFilter;
+
+    PreferenceFilter(Configuration conf,
+      Path firstChoicePrefix, Path secondChoicePrefix,
+      int targetRepl, long startTime, long modTimePeriod) {
+      super(conf);
+      this.firstChoicePrefix = firstChoicePrefix;
+      this.secondChoiceFilter = new TimeBasedFilter(conf,
+        secondChoicePrefix, targetRepl, startTime, modTimePeriod);
+    }
+
+    PreferenceFilter(Configuration conf,
+      Path firstChoicePrefix, Path secondChoicePrefix,
+      PolicyInfo info, List<PolicyInfo> allPolicies, long startTime,
+        Statistics stats) {
+      super(conf);
+      this.firstChoicePrefix = firstChoicePrefix;
+      this.secondChoiceFilter = new TimeBasedFilter(
+        conf, secondChoicePrefix, info, allPolicies, startTime, stats);
+    }
+
+    public boolean check(FileStatus f) throws IOException {
+      Object firstChoicePPair =
+        RaidNode.getParityFile(firstChoicePrefix, f.getPath(), getConf());
+      if (firstChoicePPair == null) {
+        // The decision is upto the the second choice filter.
+        return secondChoiceFilter.check(f);
+      } else {
+        // There is already a parity file under the first choice path.
+        // We dont want to choose this file.
+        return false;
+      }
+    }
+  }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java Fri Nov 12 00:17:22 2010
@@ -20,12 +20,12 @@ package org.apache.hadoop.raid;
 
 import java.io.IOException;
 import java.io.FileNotFoundException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.LinkedList;
-import java.util.Iterator;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Random;
@@ -63,6 +63,7 @@ import org.apache.hadoop.util.Progressab
 import org.apache.hadoop.raid.protocol.PolicyInfo;
 import org.apache.hadoop.raid.protocol.PolicyList;
 import org.apache.hadoop.raid.protocol.RaidProtocol;
+import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
 
 /**
  * A base class that implements {@link RaidProtocol}.
@@ -77,19 +78,35 @@ public abstract class RaidNode implement
     Configuration.addDefaultResource("mapred-default.xml");
     Configuration.addDefaultResource("mapred-site.xml");
   }
-
   public static final Log LOG = LogFactory.getLog( "org.apache.hadoop.raid.RaidNode");
   public static final long SLEEP_TIME = 10000L; // 10 seconds
   public static final int DEFAULT_PORT = 60000;
-  public static final int DEFAULT_STRIPE_LENGTH = 5; // default value of stripe length
+  // Default stripe length = 5, parity length for RS code = 3
+  public static final int DEFAULT_STRIPE_LENGTH = 5;
+  public static final int RS_PARITY_LENGTH_DEFAULT = 3;
+
+  public static final String RS_PARITY_LENGTH_KEY = "hdfs.raidrs.paritylength";
   public static final String STRIPE_LENGTH_KEY = "hdfs.raid.stripeLength";
+
   public static final String DEFAULT_RAID_LOCATION = "/raid";
   public static final String RAID_LOCATION_KEY = "hdfs.raid.locations";
+  public static final String DEFAULT_RAID_TMP_LOCATION = "/tmp/raid";
+  public static final String RAID_TMP_LOCATION_KEY = "fs.raid.tmpdir";
+  public static final String DEFAULT_RAID_HAR_TMP_LOCATION = "/tmp/raid_har";
+  public static final String RAID_HAR_TMP_LOCATION_KEY = "fs.raid.hartmpdir";
+
+  public static final String DEFAULT_RAIDRS_LOCATION = "/raidrs";
+  public static final String RAIDRS_LOCATION_KEY = "hdfs.raidrs.locations";
+  public static final String DEFAULT_RAIDRS_TMP_LOCATION = "/tmp/raidrs";
+  public static final String RAIDRS_TMP_LOCATION_KEY = "fs.raidrs.tmpdir";
+  public static final String DEFAULT_RAIDRS_HAR_TMP_LOCATION = "/tmp/raidrs_har";
+  public static final String RAIDRS_HAR_TMP_LOCATION_KEY = "fs.raidrs.hartmpdir";
+
   public static final String HAR_SUFFIX = "_raid.har";
   public static final Pattern PARITY_HAR_PARTFILE_PATTERN =
     Pattern.compile(".*" + HAR_SUFFIX + "/part-.*");
-  
-  public static final String RAIDNODE_CLASSNAME_KEY = "raid.classname";  
+
+  public static final String RAIDNODE_CLASSNAME_KEY = "raid.classname";
 
   /** RPC server */
   private Server server;
@@ -131,11 +148,11 @@ public abstract class RaidNode implement
     long metaSize;           // total disk space for meta files
 
     public void clear() {
-    	numProcessedBlocks = 0;
-    	processedSize = 0;
-    	remainingSize = 0;
-    	numMetaBlocks = 0;
-    	metaSize = 0;
+      numProcessedBlocks = 0;
+      processedSize = 0;
+      remainingSize = 0;
+      numMetaBlocks = 0;
+      metaSize = 0;
     }
     public String toString() {
       long save = processedSize - (remainingSize + metaSize);
@@ -310,13 +327,24 @@ public abstract class RaidNode implement
     // find stripe length from config
     int stripeLength = getStripeLength(conf);
 
-    Path destPref = getDestinationPath(conf);
+    // first try decode using XOR code
+    Path destPref = xorDestinationPath(conf);
     Decoder decoder = new XORDecoder(conf, RaidNode.getStripeLength(conf));
     Path unraided = unRaid(conf, srcPath, destPref, decoder,
         stripeLength, corruptOffset);
     if (unraided != null) {
       return unraided.toString();
     }
+
+    // try decode using ReedSolomon code
+    destPref = rsDestinationPath(conf);
+    decoder = new ReedSolomonDecoder(conf, RaidNode.getStripeLength(conf),
+        RaidNode.rsParityLength(conf));
+    unraided = unRaid(conf, srcPath, destPref, decoder,
+        stripeLength, corruptOffset);
+    if (unraided != null) {
+      return unraided.toString();
+    }
     return null;
   }
 
@@ -329,7 +357,6 @@ public abstract class RaidNode implement
    * Periodically checks to see which policies should be fired.
    */
   class TriggerMonitor implements Runnable {
-
     class ScanState {
       long fullScanStartTime;
       DirectoryTraversal pendingTraversal;
@@ -393,7 +420,7 @@ public abstract class RaidNode implement
     */
     private List<FileStatus> selectFiles(
       PolicyInfo info, ArrayList<PolicyInfo> allPolicies) throws IOException {
-      Path destPrefix = getDestinationPath(conf);
+      Path destPrefix = getDestinationPath(info.getErasureCode(), conf);
       String policyName = info.getName();
       Path srcPath = info.getSrcPath();
       long modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
@@ -480,7 +507,6 @@ public abstract class RaidNode implement
             }
           }
         }
-
         for (PolicyInfo info: allPolicies) {
           if (!scanStateMap.containsKey(info.getName())) {
             scanStateMap.put(info.getName(), new ScanState());
@@ -526,8 +552,19 @@ public abstract class RaidNode implement
       long startTime, PolicyInfo info, List<PolicyInfo> allPolicies,
       RaidFilter.Statistics stats)
       throws IOException {
-      return new RaidFilter.TimeBasedFilter(conf, getDestinationPath(conf),
-        info, allPolicies, startTime, stats);
+      switch (info.getErasureCode()) {
+      case XOR:
+        // Return a preference-based filter that prefers RS parity files
+        // over XOR parity files.
+        return new RaidFilter.PreferenceFilter(
+          conf, rsDestinationPath(conf), xorDestinationPath(conf),
+          info, allPolicies, startTime, stats);
+      case RS:
+        return new RaidFilter.TimeBasedFilter(conf, rsDestinationPath(conf),
+          info, allPolicies, startTime, stats);
+      default:
+        return null;
+      }
     }
   }
 
@@ -623,13 +660,31 @@ public abstract class RaidNode implement
 
     return null; // NULL if no parity file
   }
-  
-  private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath) throws IOException {
-    
+
+  static ParityFilePair xorParityForSource(Path srcPath, Configuration conf)
+      throws IOException {
+    try {
+      Path destPath = xorDestinationPath(conf);
+      return getParityFile(destPath, srcPath, conf);
+    } catch (FileNotFoundException e) {
+    }
+    return null;
+  }
+
+  static ParityFilePair rsParityForSource(Path srcPath, Configuration conf)
+      throws IOException {
+    try {
+      Path destPath = rsDestinationPath(conf);
+      return getParityFile(destPath, srcPath, conf);
+    } catch (FileNotFoundException e) {
+    }
+    return null;
+  }
+
+  private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath)
+      throws IOException {
     return getParityFile(destPathPrefix, srcPath, conf);
-    
   }
-  
 
   /**
    * RAID a list of files.
@@ -639,7 +694,7 @@ public abstract class RaidNode implement
     int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
     int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
     int stripeLength = getStripeLength(conf);
-    Path destPref = getDestinationPath(conf);
+    Path destPref = getDestinationPath(info.getErasureCode(), conf);
     String simulate = info.getProperty("simulate");
     boolean doSimulate = simulate == null ? false : Boolean
         .parseBoolean(simulate);
@@ -648,8 +703,9 @@ public abstract class RaidNode implement
     int count = 0;
 
     for (FileStatus s : paths) {
-      doRaid(conf, s, destPref, statistics, new RaidUtils.DummyProgressable(),
-             doSimulate, targetRepl, metaRepl, stripeLength);
+      doRaid(conf, s, destPref, info.getErasureCode(), statistics,
+        new RaidUtils.DummyProgressable(), doSimulate, targetRepl, metaRepl,
+        stripeLength);
       if (count % 1000 == 0) {
         LOG.info("RAID statistics " + statistics.toString());
       }
@@ -664,25 +720,28 @@ public abstract class RaidNode implement
    */
 
   static public void doRaid(Configuration conf, PolicyInfo info,
-      FileStatus src, Statistics statistics, Progressable reporter) throws IOException {
+      FileStatus src, Statistics statistics, Progressable reporter)
+      throws IOException {
     int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
     int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
     int stripeLength = getStripeLength(conf);
-    Path destPref = getDestinationPath(conf);
+
+    Path destPref = getDestinationPath(info.getErasureCode(), conf);
     String simulate = info.getProperty("simulate");
     boolean doSimulate = simulate == null ? false : Boolean
         .parseBoolean(simulate);
 
-    doRaid(conf, src, destPref, statistics, reporter, doSimulate,
-           targetRepl, metaRepl, stripeLength);
+    doRaid(conf, src, destPref, info.getErasureCode(), statistics,
+                  reporter, doSimulate, targetRepl, metaRepl, stripeLength);
   }
 
   /**
    * RAID an individual file
    */
   static private void doRaid(Configuration conf, FileStatus stat, Path destPath,
-                      Statistics statistics, Progressable reporter, boolean doSimulate,
-                      int targetRepl, int metaRepl, int stripeLength) 
+                      PolicyInfo.ErasureCodeType code, Statistics statistics,
+                      Progressable reporter, boolean doSimulate,
+                      int targetRepl, int metaRepl, int stripeLength)
     throws IOException {
     Path p = stat.getPath();
     FileSystem srcFs = p.getFileSystem(conf);
@@ -704,7 +763,8 @@ public abstract class RaidNode implement
     statistics.processedSize += diskSpace;
 
     // generate parity file
-    generateParityFile(conf, stat, reporter, srcFs, destPath, locations, metaRepl, stripeLength);
+    generateParityFile(conf, stat, reporter, srcFs, destPath, code,
+        locations, metaRepl, stripeLength);
 
     // reduce the replication factor of the source file
     if (!doSimulate) {
@@ -740,7 +800,9 @@ public abstract class RaidNode implement
   static private void generateParityFile(Configuration conf, FileStatus stat,
                                   Progressable reporter,
                                   FileSystem inFs,
-                                  Path destPathPrefix, BlockLocation[] locations,
+                                  Path destPathPrefix,
+                                  ErasureCodeType code,
+                                  BlockLocation[] locations,
                                   int metaRepl, int stripeLength) throws IOException {
 
     Path inpath = stat.getPath();
@@ -751,16 +813,16 @@ public abstract class RaidNode implement
     try {
       FileStatus stmp = outFs.getFileStatus(outpath);
       if (stmp.getModificationTime() == stat.getModificationTime()) {
-        LOG.info("Parity file for " + inpath + "(" + locations.length + ") is " + outpath +
-                 " already upto-date. Nothing more to do.");
+        LOG.info("Parity file for " + inpath + "(" + locations.length +
+              ") is " + outpath + " already upto-date. Nothing more to do.");
         return;
       }
     } catch (IOException e) {
       // ignore errors because the raid file might not exist yet.
     }
 
-    XOREncoder encoder = new XOREncoder(conf, stripeLength);
-    encoder.encodeFile(inFs, inpath, outpath, (short)metaRepl, reporter);
+    Encoder encoder = encoderForCode(conf, code);
+    encoder.encodeFile(inFs, inpath, outFs, outpath, (short)metaRepl, reporter);
 
     // set the modification time of the RAID file. This is done so that the modTime of the
     // RAID file reflects that contents of the source file that it has RAIDed. This should
@@ -792,8 +854,8 @@ public abstract class RaidNode implement
       return null;
     }
 
-    final Path recoveryDestination = 
-      new Path(conf.get("fs.raid.tmpdir", "/tmp/raid"));
+    final Path recoveryDestination = new Path(
+      RaidNode.unraidTmpDirectory(conf));
     FileSystem destFs = recoveryDestination.getFileSystem(conf);
     final Path recoveredPrefix = 
       destFs.makeQualified(new Path(recoveryDestination, makeRelative(srcPath)));
@@ -827,6 +889,43 @@ public abstract class RaidNode implement
     }
 
     /**
+     * Traverse the parity destination directory, removing directories that
+     * no longer existing in the source.
+     * @throws IOException
+     */
+    private void purgeDirectories(FileSystem fs, Path root) throws IOException {
+      String prefix = root.toUri().getPath();
+      List<FileStatus> startPaths = new LinkedList<FileStatus>();
+      try {
+        startPaths.add(fs.getFileStatus(root));
+      } catch (FileNotFoundException e) {
+        return;
+      }
+      DirectoryTraversal dt = new DirectoryTraversal(fs, startPaths);
+      FileStatus dir = dt.getNextDirectory();
+      for (; dir != null; dir = dt.getNextDirectory()) {
+        Path dirPath = dir.getPath();
+        if (dirPath.toUri().getPath().endsWith(HAR_SUFFIX)) {
+          continue;
+        }
+        String dirStr = dirPath.toUri().getPath();
+        if (!dirStr.startsWith(prefix)) {
+          continue;
+        }
+        String src = dirStr.replaceFirst(prefix, "");
+        if (src.length() == 0) continue;
+        Path srcPath = new Path(src);
+        if (!fs.exists(srcPath)) {
+          LOG.info("Purging directory " + dirPath);
+          boolean done = fs.delete(dirPath, true);
+          if (!done) {
+            LOG.error("Could not purge " + dirPath);
+          }
+        }
+      }
+    }
+
+    /**
      * Delete orphaned files. The reason this is done by a separate thread 
      * is to not burden the TriggerMonitor with scanning the 
      * destination directories.
@@ -844,13 +943,23 @@ public abstract class RaidNode implement
         LOG.info("Started purge scan");
         prevExec = now();
 
+        // expand destination prefix path
+        Path destPref = xorDestinationPath(conf);
+        FileSystem destFs = destPref.getFileSystem(conf);
+        purgeDirectories(destFs, destPref);
+
+        destPref = rsDestinationPath(conf);
+        destFs = destPref.getFileSystem(conf);
+        purgeDirectories(destFs, destPref);
+
+        // fetch all categories
         for (PolicyList category : configMgr.getAllPolicies()) {
           for (PolicyInfo info: category.getAll()) {
 
             try {
               // expand destination prefix path
-              Path destPref = getDestinationPath(conf);
-              FileSystem destFs = destPref.getFileSystem(conf);
+              destPref = getDestinationPath(info.getErasureCode(), conf);
+              destFs = destPref.getFileSystem(conf);
 
               //get srcPaths
               Path[] srcPaths = info.getSrcPathExpanded();
@@ -870,7 +979,8 @@ public abstract class RaidNode implement
                   if (stat != null) {
                     LOG.info("Purging obsolete parity files for policy " + 
                               info.getName() + " " + destPath);
-                    recursePurge(srcFs, destFs, destPref.toUri().getPath(), stat);
+                    recursePurge(info.getErasureCode(), srcFs, destFs,
+                              destPref.toUri().getPath(), stat);
                   }
 
                 }
@@ -889,7 +999,8 @@ public abstract class RaidNode implement
      * The destPrefix is the absolute pathname of the destinationPath
      * specified in the policy (without the host:port)
      */ 
-    private void recursePurge(FileSystem srcFs, FileSystem destFs,
+    void recursePurge(ErasureCodeType code,
+                              FileSystem srcFs, FileSystem destFs,
                               String destPrefix, FileStatus dest) 
       throws IOException {
 
@@ -901,7 +1012,7 @@ public abstract class RaidNode implement
       if (dest.isDirectory() && destStr.endsWith(HAR_SUFFIX)) {
         try {
           int harUsedPercent =
-            usefulHar(srcFs, destFs, destPath, destPrefix, conf);
+            usefulHar(code, srcFs, destFs, destPath, destPrefix, conf);
           LOG.info("Useful percentage of " + destStr + " " + harUsedPercent);
           // Delete the har if its usefulness reaches a threshold.
           if (harUsedPercent <= conf.getInt("raid.har.usage.threshold", 0)) {
@@ -940,7 +1051,7 @@ public abstract class RaidNode implement
         }
         if (files != null) {
           for (FileStatus one:files) {
-            recursePurge(srcFs, destFs, destPrefix, one);
+            recursePurge(code, srcFs, destFs, destPrefix, one);
           }
         }
         // If the directory is empty now, it will be purged the next time this
@@ -950,12 +1061,40 @@ public abstract class RaidNode implement
       
       String src = destStr.replaceFirst(destPrefix, "");
       
-      // if the source path does not exist or the parity file has been HARed, 
-      // then delete the parity file
       Path srcPath = new Path(src);
-      Path dstPath = (new Path(destPrefix.trim())).makeQualified(destFs);
-      if (!srcFs.exists(srcPath) || 
-          !destPath.equals(getParityFile(dstPath,srcPath).getPath())) {
+      boolean shouldDelete = false;
+
+      if (!srcFs.exists(srcPath)) {
+        shouldDelete = true;
+      } else {
+        try {
+          // If there is a RS parity file, the XOR parity can be deleted.
+          if (code == ErasureCodeType.XOR) {
+            ParityFilePair ppair = getParityFile(
+               getDestinationPath(ErasureCodeType.RS, conf), srcPath, conf);
+            if (ppair != null) {
+              shouldDelete = true;
+            }
+          }
+          if (!shouldDelete) {
+            Path dstPath = (new Path(destPrefix.trim())).makeQualified(destFs);
+            ParityFilePair ppair = getParityFile(dstPath,srcPath);
+            // If the parity file is not the appropriate one for the source or
+            // the parityFs is not the same as this file's filesystem
+            // (it is a HAR), this file can be deleted.
+            if ( ppair == null ||
+                 !destFs.equals(ppair.getFileSystem()) ||
+                 !destPath.equals(ppair.getPath())) {
+              shouldDelete = true;
+            }
+          }
+        } catch (IOException e) {
+          LOG.warn("Error during purging " + src + " " +
+                   StringUtils.stringifyException(e));
+        }
+      }
+
+      if (shouldDelete) {
         boolean done = destFs.delete(destPath, false);
         if (done) {
           LOG.info("Purged file " + destPath );
@@ -971,6 +1110,7 @@ public abstract class RaidNode implement
   // total number of files in the har.
   //
   protected static int usefulHar(
+    ErasureCodeType code,
     FileSystem srcFs, FileSystem destFs,
     Path harPath, String destPrefix, Configuration conf) throws IOException {
 
@@ -996,6 +1136,15 @@ public abstract class RaidNode implement
         continue;
       }
       String src = parityStr.substring(prefixToReplace.length());
+      if (code == ErasureCodeType.XOR) {
+        ParityFilePair ppair = getParityFile(
+          getDestinationPath(ErasureCodeType.RS, conf), new Path(src), conf);
+        if (ppair != null) {
+          // There is a valid RS parity file, so the XOR one is useless.
+          numUseless++;
+          continue;
+        }
+      }
       try {
         FileStatus srcStatus = srcFs.getFileStatus(new Path(src));
         if (srcStatus == null) {
@@ -1026,18 +1175,16 @@ public abstract class RaidNode implement
       LOG.info("Started archive scan");
       prevExec = now();
 
+      // fetch all categories
       for (PolicyList category : configMgr.getAllPolicies()) {
         for (PolicyInfo info: category.getAll()) {
+          String tmpHarPath = tmpHarPathForCode(conf, info.getErasureCode());
           String str = info.getProperty("time_before_har");
-          String tmpHarPath = info.getProperty("har_tmp_dir");
-          if (tmpHarPath == null) {
-            tmpHarPath = "/tmp/raid_har";
-          }
           if (str != null) {
             try {
               long cutoff = now() - ( Long.parseLong(str) * 24L * 3600000L );
 
-              Path destPref = getDestinationPath(conf);
+              Path destPref = getDestinationPath(info.getErasureCode(), conf);
               FileSystem destFs = destPref.getFileSystem(conf); 
 
               //get srcPaths
@@ -1074,7 +1221,7 @@ public abstract class RaidNode implement
     return;
   }
   
-  private void recurseHar(PolicyInfo info, FileSystem destFs, FileStatus dest,
+  void recurseHar(PolicyInfo info, FileSystem destFs, FileStatus dest,
     String destPrefix, FileSystem srcFs, long cutoff, String tmpHarPath)
     throws IOException {
 
@@ -1198,10 +1345,71 @@ public abstract class RaidNode implement
   }
 
   /**
-   * Return the path prefix that stores the parity files
+   * Return the temp path for XOR parity files
    */
-  static Path getDestinationPath(Configuration conf)
-      throws IOException {
+  public static String unraidTmpDirectory(Configuration conf) {
+    return conf.get(RAID_TMP_LOCATION_KEY, DEFAULT_RAID_TMP_LOCATION);
+  }
+
+  /**
+   * Return the temp path for ReedSolomonEncoder parity files
+   */
+  public static String rsTempPrefix(Configuration conf) {
+    return conf.get(RAIDRS_TMP_LOCATION_KEY, DEFAULT_RAIDRS_TMP_LOCATION);
+  }
+
+  /**
+   * Return the temp path for XOR parity files
+   */
+  public static String xorHarTempPrefix(Configuration conf) {
+    return conf.get(RAID_HAR_TMP_LOCATION_KEY, DEFAULT_RAID_HAR_TMP_LOCATION);
+  }
+
+  /**
+   * Return the temp path for ReedSolomonEncoder parity files
+   */
+  public static String rsHarTempPrefix(Configuration conf) {
+    return conf.get(RAIDRS_HAR_TMP_LOCATION_KEY,
+        DEFAULT_RAIDRS_HAR_TMP_LOCATION);
+  }
+
+  /**
+   * Return the destination path for ReedSolomon parity files
+   */
+  public static Path rsDestinationPath(Configuration conf, FileSystem fs) {
+    String loc = conf.get(RAIDRS_LOCATION_KEY, DEFAULT_RAIDRS_LOCATION);
+    Path p = new Path(loc.trim());
+    p = p.makeQualified(fs);
+    return p;
+  }
+
+  /**
+   * Return the destination path for ReedSolomon parity files
+   */
+  public static Path rsDestinationPath(Configuration conf)
+    throws IOException {
+    String loc = conf.get(RAIDRS_LOCATION_KEY, DEFAULT_RAIDRS_LOCATION);
+    Path p = new Path(loc.trim());
+    FileSystem fs = FileSystem.get(p.toUri(), conf);
+    p = p.makeQualified(fs);
+    return p;
+  }
+
+  /**
+   * Return the destination path for XOR parity files
+   */
+  public static Path xorDestinationPath(Configuration conf, FileSystem fs) {
+    String loc = conf.get(RAID_LOCATION_KEY, DEFAULT_RAID_LOCATION);
+    Path p = new Path(loc.trim());
+    p = p.makeQualified(fs);
+    return p;
+  }
+
+  /**
+   * Return the destination path for XOR parity files
+   */
+  public static Path xorDestinationPath(Configuration conf)
+    throws IOException {
     String loc = conf.get(RAID_LOCATION_KEY, DEFAULT_RAID_LOCATION);
     Path p = new Path(loc.trim());
     FileSystem fs = FileSystem.get(p.toUri(), conf);
@@ -1210,12 +1418,57 @@ public abstract class RaidNode implement
   }
 
   /**
+   * Return the path prefix that stores the parity files
+   */
+  static Path getDestinationPath(ErasureCodeType code, Configuration conf)
+    throws IOException {
+    switch (code) {
+      case XOR:
+        return xorDestinationPath(conf);
+      case RS:
+        return rsDestinationPath(conf);
+      default:
+        return null;
+    }
+  }
+
+  static Encoder encoderForCode(Configuration conf, ErasureCodeType code) {
+    int stripeLength = getStripeLength(conf);
+    switch (code) {
+      case XOR:
+        return new XOREncoder(conf, stripeLength);
+      case RS:
+        return new ReedSolomonEncoder(conf, stripeLength, rsParityLength(conf));
+      default:
+        return null;
+    }
+  }
+
+  static String tmpHarPathForCode(Configuration conf, ErasureCodeType code) {
+    switch (code) {
+      case XOR:
+        return xorHarTempPrefix(conf);
+      case RS:
+        return rsHarTempPrefix(conf);
+      default:
+        return null;
+    }
+  }
+
+  /**
    * Obtain stripe length from configuration
    */
   public static int getStripeLength(Configuration conf) {
     return conf.getInt(STRIPE_LENGTH_KEY, DEFAULT_STRIPE_LENGTH);
   }
 
+  /**
+   * Obtain stripe length from configuration
+   */
+  public static int rsParityLength(Configuration conf) {
+    return conf.getInt(RS_PARITY_LENGTH_KEY, RS_PARITY_LENGTH_DEFAULT);
+  }
+
   static boolean isParityHarPartFile(Path p) {
     Matcher m = PARITY_HAR_PARTFILE_PATTERN.matcher(p.toUri().getPath());
     return m.matches();

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java Fri Nov 12 00:17:22 2010
@@ -39,7 +39,7 @@ public class RaidUtils {
    *
    * We could have used Reporter.NULL here but that would introduce
    * a dependency on mapreduce.
-   */ 
+   */
   public static class DummyProgressable implements Progressable {
     /**
      * Do nothing.

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonCode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonCode.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonCode.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonCode.java Fri Nov 12 00:17:22 2010
@@ -22,9 +22,9 @@ public class ReedSolomonCode implements 
 
   private final int stripeSize;
   private final int paritySize;
-  private final int[] generatingRoots;
   private final int[] generatingPolynomial;
   private final int PRIMITIVE_ROOT = 2;
+  private final int[] primitivePower;
   private final GaloisField GF = new GaloisField();
   private int[] errSignature;
   private final int[] paritySymbolLocations;
@@ -40,14 +40,17 @@ public class ReedSolomonCode implements 
     for (int i = 0; i < paritySize; i++) {
       paritySymbolLocations[i] = i;
     }
-    this.generatingRoots = new int[paritySize];
-    
+
+    this.primitivePower = new int[stripeSize + paritySize];
+    // compute powers of the primitive root
+    for (int i = 0; i < stripeSize + paritySize; i++) {
+      primitivePower[i] = GF.power(PRIMITIVE_ROOT, i);
+    }
     // compute generating polynomial
     int[] gen = {1};
     int[] poly = new int[2];
     for (int i = 0; i < paritySize; i++) {
-      generatingRoots[i] = GF.power(PRIMITIVE_ROOT, i);
-      poly[0] = generatingRoots[i];
+      poly[0] = primitivePower[i];
       poly[1] = 1;
       gen = GF.multiply(gen, poly);
     }
@@ -80,8 +83,8 @@ public class ReedSolomonCode implements 
       data[erasedLocation[i]] = 0;
     }
     for (int i = 0; i < erasedLocation.length; i++) {
-      errSignature[i] = GF.power(PRIMITIVE_ROOT, erasedLocation[i]);
-      erasedValue[i] = GF.substitute(data, generatingRoots[i]);
+      errSignature[i] = primitivePower[erasedLocation[i]];
+      erasedValue[i] = GF.substitute(data, primitivePower[i]);
     }
     GF.solveVandermondeSystem(errSignature, erasedValue, erasedLocation.length);
   }

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonDecoder.java?rev=1034221&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonDecoder.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonDecoder.java Fri Nov 12 00:17:22 2010
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockMissingException;
+
+public class ReedSolomonDecoder extends Decoder {
+  public static final Log LOG = LogFactory.getLog(
+                                  "org.apache.hadoop.raid.ReedSolomonDecoder");
+  private ErasureCode reedSolomonCode;
+
+  public ReedSolomonDecoder(
+    Configuration conf, int stripeSize, int paritySize) {
+    super(conf, stripeSize, paritySize);
+    this.reedSolomonCode = new ReedSolomonCode(stripeSize, paritySize);
+  }
+
+  @Override
+  protected void fixErasedBlock(
+      FileSystem fs, Path srcFile,
+      FileSystem parityFs, Path parityFile,
+      long blockSize, long errorOffset, long bytesToSkip, long limit,
+      OutputStream out) throws IOException {
+    FSDataInputStream[] inputs = new FSDataInputStream[stripeSize + paritySize];
+    int[] erasedLocations = buildInputs(fs, srcFile, parityFs, parityFile,
+                                        errorOffset, inputs);
+    int blockIdxInStripe = ((int)(errorOffset/blockSize)) % stripeSize;
+    int erasedLocationToFix = paritySize + blockIdxInStripe;
+    writeFixedBlock(inputs, erasedLocations, erasedLocationToFix,
+                    bytesToSkip, limit, out);
+  }
+
+  protected int[] buildInputs(FileSystem fs, Path srcFile,
+                              FileSystem parityFs, Path parityFile,
+                              long errorOffset, FSDataInputStream[] inputs)
+      throws IOException {
+    LOG.info("Building inputs to recover block starting at " + errorOffset);
+    FileStatus srcStat = fs.getFileStatus(srcFile);
+    long blockSize = srcStat.getBlockSize();
+    long blockIdx = (int)(errorOffset / blockSize);
+    long stripeIdx = blockIdx / stripeSize;
+    LOG.info("FileSize = " + srcStat.getLen() + ", blockSize = " + blockSize +
+             ", blockIdx = " + blockIdx + ", stripeIdx = " + stripeIdx);
+    ArrayList<Integer> erasedLocations = new ArrayList<Integer>();
+    // First open streams to the parity blocks.
+    for (int i = 0; i < paritySize; i++) {
+      long offset = blockSize * (stripeIdx * paritySize + i);
+      FSDataInputStream in = parityFs.open(
+        parityFile, conf.getInt("io.file.buffer.size", 64 * 1024));
+      in.seek(offset);
+      LOG.info("Adding " + parityFile + ":" + offset + " as input " + i);
+      inputs[i] = in;
+    }
+    // Now open streams to the data blocks.
+    for (int i = paritySize; i < paritySize + stripeSize; i++) {
+      long offset = blockSize * (stripeIdx * stripeSize + i - paritySize);
+      if (offset == errorOffset) {
+        LOG.info(srcFile + ":" + offset +
+            " is known to have error, adding zeros as input " + i);
+        inputs[i] = new FSDataInputStream(new RaidUtils.ZeroInputStream(
+            offset + blockSize));
+        erasedLocations.add(i);
+      } else if (offset > srcStat.getLen()) {
+        LOG.info(srcFile + ":" + offset +
+                 " is past file size, adding zeros as input " + i);
+        inputs[i] = new FSDataInputStream(new RaidUtils.ZeroInputStream(
+            offset + blockSize));
+      } else {
+        FSDataInputStream in = fs.open(
+          srcFile, conf.getInt("io.file.buffer.size", 64 * 1024));
+        in.seek(offset);
+        LOG.info("Adding " + srcFile + ":" + offset + " as input " + i);
+        inputs[i] = in;
+      }
+    }
+    if (erasedLocations.size() > paritySize) {
+      String msg = "Too many erased locations: " + erasedLocations.size();
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+    int[] locs = new int[erasedLocations.size()];
+    for (int i = 0; i < locs.length; i++) {
+      locs[i] = erasedLocations.get(i);
+    }
+    return locs;
+  }
+
+  /**
+   * Decode the inputs provided and write to the output.
+   * @param inputs array of inputs.
+   * @param erasedLocations indexes in the inputs which are known to be erased.
+   * @param erasedLocationToFix index in the inputs which needs to be fixed.
+   * @param skipBytes number of bytes to skip before writing to output.
+   * @param limit maximum number of bytes to be written/skipped.
+   * @param out the output.
+   * @throws IOException
+   */
+  void writeFixedBlock(
+          FSDataInputStream[] inputs,
+          int[] erasedLocations,
+          int erasedLocationToFix,
+          long skipBytes,
+          long limit,
+          OutputStream out) throws IOException {
+
+    LOG.info("Need to write " + (limit - skipBytes) +
+             " bytes for erased location index " + erasedLocationToFix);
+    int[] tmp = new int[inputs.length];
+    int[] decoded = new int[erasedLocations.length];
+    long toDiscard = skipBytes;
+    // Loop while the number of skipped + written bytes is less than the max.
+    for (long written = 0; skipBytes + written < limit; ) {
+      erasedLocations = readFromInputs(inputs, erasedLocations, limit);
+      if (decoded.length != erasedLocations.length) {
+        decoded = new int[erasedLocations.length];
+      }
+
+      int toWrite = (int)Math.min((long)bufSize, limit - (skipBytes + written));
+      if (toDiscard >= toWrite) {
+        toDiscard -= toWrite;
+        continue;
+      }
+
+      // Decoded bufSize amount of data.
+      for (int i = 0; i < bufSize; i++) {
+        performDecode(readBufs, writeBufs, i, tmp, erasedLocations, decoded);
+      }
+
+      for (int i = 0; i < erasedLocations.length; i++) {
+        if (erasedLocations[i] == erasedLocationToFix) {
+          toWrite -= toDiscard;
+          out.write(writeBufs[i], (int)toDiscard, toWrite);
+          toDiscard = 0;
+          written += toWrite;
+          LOG.debug("Wrote " + toWrite + " bytes for erased location index " +
+                    erasedLocationToFix);
+          break;
+        }
+      }
+    }
+  }
+
+  int[] readFromInputs(
+          FSDataInputStream[] inputs,
+          int[] erasedLocations,
+          long limit) throws IOException {
+    // For every input, read some data = bufSize.
+    for (int i = 0; i < inputs.length; i++) {
+      long curPos = inputs[i].getPos();
+      try {
+        RaidUtils.readTillEnd(inputs[i], readBufs[i], true);
+        continue;
+      } catch (BlockMissingException e) {
+        LOG.error("Encountered BlockMissingException in stream " + i);
+      } catch (ChecksumException e) {
+        LOG.error("Encountered ChecksumException in stream " + i);
+      }
+
+      // Found a new erased location.
+      if (erasedLocations.length == paritySize) {
+        String msg = "Too many read errors";
+        LOG.error(msg);
+        throw new IOException(msg);
+      }
+
+      // Add this stream to the set of erased locations.
+      int[] newErasedLocations = new int[erasedLocations.length + 1];
+      for (int j = 0; j < erasedLocations.length; j++) {
+        newErasedLocations[j] = erasedLocations[j];
+      }
+      newErasedLocations[newErasedLocations.length - 1] = i;
+      erasedLocations = newErasedLocations;
+
+      LOG.info("Using zeros for stream " + i);
+      inputs[i] = new FSDataInputStream(
+        new RaidUtils.ZeroInputStream(curPos + limit));
+      inputs[i].seek(curPos);
+      RaidUtils.readTillEnd(inputs[i], readBufs[i], true);
+    }
+    return erasedLocations;
+  }
+
+  void performDecode(byte[][] readBufs, byte[][] writeBufs,
+                     int idx, int[] inputs,
+                     int[] erasedLocations, int[] decoded) {
+    for (int i = 0; i < decoded.length; i++) {
+      decoded[i] = 0;
+    }
+    for (int i = 0; i < inputs.length; i++) {
+      inputs[i] = readBufs[i][idx] & 0x000000FF;
+    }
+    reedSolomonCode.decode(inputs, erasedLocations, decoded);
+    for (int i = 0; i < decoded.length; i++) {
+      writeBufs[i][idx] = (byte)decoded[i];
+    }
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonEncoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonEncoder.java?rev=1034221&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonEncoder.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonEncoder.java Fri Nov 12 00:17:22 2010
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+public class ReedSolomonEncoder extends Encoder {
+  public static final Log LOG = LogFactory.getLog(
+                                  "org.apache.hadoop.raid.ReedSolomonEncoder");
+  private ErasureCode reedSolomonCode;
+
+  public ReedSolomonEncoder(
+    Configuration conf, int stripeSize, int paritySize) {
+    super(conf, stripeSize, paritySize);
+    this.reedSolomonCode = new ReedSolomonCode(stripeSize, paritySize);
+  }
+
+  protected void encodeStripe(
+    InputStream[] blocks,
+    long stripeStartOffset,
+    long blockSize,
+    OutputStream[] outs,
+    Progressable reporter) throws IOException {
+
+    int[] data = new int[stripeSize];
+    int[] code = new int[paritySize];
+
+    for (long encoded = 0; encoded < blockSize; encoded += bufSize) {
+      // Read some data from each block = bufSize.
+      for (int i = 0; i < blocks.length; i++) {
+        RaidUtils.readTillEnd(blocks[i], readBufs[i], true);
+      }
+
+      // Encode the data read.
+      for (int j = 0; j < bufSize; j++) {
+        performEncode(readBufs, writeBufs, j, data, code);
+      }
+
+      // Now that we have some data to write, send it to the temp files.
+      for (int i = 0; i < paritySize; i++) {
+        outs[i].write(writeBufs[i], 0, bufSize);
+      }
+
+      if (reporter != null) {
+        reporter.progress();
+      }
+    }
+  }
+
+  void performEncode(byte[][] readBufs, byte[][] writeBufs, int idx,
+                          int[] data, int[] code) {
+    for (int i = 0; i < paritySize; i++) {
+      code[i] = 0;
+    }
+    for (int i = 0; i < stripeSize; i++) {
+      data[i] = readBufs[i][idx] & 0x000000FF;
+    }
+    reedSolomonCode.encode(data, code);
+    for (int i = 0; i < paritySize; i++) {
+      writeBufs[i][idx] = (byte)code[i];
+    }
+  }
+
+  @Override
+  public Path getParityTempPath() {
+    return new Path(RaidNode.rsTempPrefix(conf));
+  }
+
+}

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java Fri Nov 12 00:17:22 2010
@@ -55,4 +55,9 @@ public class XOREncoder extends Encoder 
       parityIn.close();
     }
   }
+
+  @Override
+  public Path getParityTempPath() {
+    return new Path(RaidNode.unraidTmpDirectory(conf));
+  }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java Fri Nov 12 00:17:22 2010
@@ -49,14 +49,26 @@ public class PolicyInfo implements Writa
 
   private Path srcPath;            // the specified src path
   private String policyName;       // name of policy
-  private String destinationPath;  // A destination path for this policy
+  private ErasureCodeType codeType;// the erasure code used
   private String description;      // A verbose description of this policy
   private Configuration conf;      // Hadoop configuration
 
   private Properties properties;   // Policy-dependent properties
 
   private ReentrantReadWriteLock plock; // protects policy operations.
-  
+  public static enum ErasureCodeType {
+    XOR, RS;
+    public static ErasureCodeType fromString(String s) {
+      if (XOR.toString().equalsIgnoreCase(s)) {
+        return XOR;
+      }
+      if (RS.toString().equalsIgnoreCase(s)) {
+        return RS;
+      }
+      return null;
+    }
+  }
+
   /**
    * Create the empty object
    */
@@ -82,6 +94,31 @@ public class PolicyInfo implements Writa
   }
 
   /**
+   * Copy fields from another PolicyInfo
+   */
+  public void copyFrom(PolicyInfo other) {
+    if (other.conf != null) {
+      this.conf = other.conf;
+    }
+    if (other.policyName != null && other.policyName.length() > 0) {
+      this.policyName = other.policyName;
+    }
+    if (other.description != null && other.description.length() > 0) {
+      this.description = other.description;
+    }
+    if (other.codeType != null) {
+      this.codeType = other.codeType;
+    }
+    if (other.srcPath != null) {
+      this.srcPath = other.srcPath;
+    }
+    for (Object key : other.properties.keySet()) {
+      String skey = (String) key;
+      this.properties.setProperty(skey, other.properties.getProperty(skey));
+    }
+  }
+
+  /**
    * Sets the input path on which this policy has to be applied
    */
   public void setSrcPath(String in) throws IOException {
@@ -90,10 +127,10 @@ public class PolicyInfo implements Writa
   }
 
   /**
-   * Set the destination path of this policy.
+   * Set the erasure code type used in this policy
    */
-  public void setDestinationPath(String des) {
-    this.destinationPath = des;
+  public void setErasureCode(String code) {
+    this.codeType = ErasureCodeType.fromString(code);
   }
 
   /**
@@ -130,8 +167,8 @@ public class PolicyInfo implements Writa
   /**
    * Get the destination path of this policy.
    */
-  public String getDestinationPath() {
-    return this.destinationPath;
+  public ErasureCodeType getErasureCode() {
+    return this.codeType;
   }
 
   /**
@@ -167,7 +204,7 @@ public class PolicyInfo implements Writa
     StringBuffer buff = new StringBuffer();
     buff.append("Policy Name:\t" + policyName + " --------------------\n");
     buff.append("Source Path:\t" + srcPath + "\n");
-    buff.append("Dest Path:\t" + destinationPath + "\n");
+    buff.append("Erasure Code:\t" + codeType + "\n");
     for (Enumeration<?> e = properties.propertyNames(); e.hasMoreElements();) {
       String name = (String) e.nextElement(); 
       buff.append( name + ":\t" + properties.getProperty(name) + "\n");
@@ -195,7 +232,7 @@ public class PolicyInfo implements Writa
   public void write(DataOutput out) throws IOException {
     Text.writeString(out, srcPath.toString());
     Text.writeString(out, policyName);
-    Text.writeString(out, destinationPath);
+    Text.writeString(out, codeType.toString());
     Text.writeString(out, description);
     out.writeInt(properties.size());
     for (Enumeration<?> e = properties.propertyNames(); e.hasMoreElements();) {
@@ -208,7 +245,7 @@ public class PolicyInfo implements Writa
   public void readFields(DataInput in) throws IOException {
     this.srcPath = new Path(Text.readString(in));
     this.policyName = Text.readString(in);
-    this.destinationPath = Text.readString(in);
+    this.codeType = ErasureCodeType.fromString(Text.readString(in));
     this.description = Text.readString(in);
     for (int n = in.readInt(); n>0; n--) {
       String name = Text.readString(in);

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java Fri Nov 12 00:17:22 2010
@@ -73,7 +73,8 @@ public class TestRaidDfs extends TestCas
   RaidNode cnode = null;
   String jobTrackerName = null;
 
-  private void mySetup() throws Exception {
+  private void mySetup(String erasureCode, int stripeLength,
+      int rsParityLength) throws Exception {
 
     new File(TEST_DIR).mkdirs(); // Make sure data directory exists
     conf = new Configuration();
@@ -81,6 +82,7 @@ public class TestRaidDfs extends TestCas
     conf.set("raid.config.file", CONFIG_FILE);
     conf.setBoolean("raid.config.reload", true);
     conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
+    conf.setInt(RaidNode.RS_PARITY_LENGTH_KEY, rsParityLength);
 
     // scan all policies once every 5 second
     conf.setLong("raid.policy.rescan.interval", 5000);
@@ -92,8 +94,9 @@ public class TestRaidDfs extends TestCas
     conf.set("raid.classname", "org.apache.hadoop.raid.LocalRaidNode");
 
     conf.set("raid.server.address", "localhost:0");
-    conf.setInt("hdfs.raid.stripeLength", 3);
-    conf.set("hdfs.raid.locations", "/destraid");
+    conf.setInt("hdfs.raid.stripeLength", stripeLength);
+    conf.set("xor".equals(erasureCode) ? RaidNode.RAID_LOCATION_KEY :
+             RaidNode.RAIDRS_LOCATION_KEY, "/destraid");
 
     dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
     dfs.waitActive();
@@ -108,7 +111,7 @@ public class TestRaidDfs extends TestCas
     String str = "<configuration> " +
                    "<srcPath prefix=\"/user/dhruba/raidtest\"> " +
                      "<policy name = \"RaidTest1\"> " +
-                        "<destPath> /destraid</destPath> " +
+                        "<erasureCode>" + erasureCode + "</erasureCode> " +
                         "<property> " +
                           "<name>targetReplication</name> " +
                           "<value>1</value> " + 
@@ -231,47 +234,28 @@ public class TestRaidDfs extends TestCas
   }
 
   /**
-   * Create a file, corrupt a block in it and ensure that the file can be
-   * read through DistributedRaidFileSystem.
+   * Create a file, corrupt several blocks in it and ensure that the file can be
+   * read through DistributedRaidFileSystem by ReedSolomon coding.
    */
-  public void testRaidDfs() throws Exception {
+  public void testRaidDfsRs() throws Exception {
     LOG.info("Test testRaidDfs started.");
 
     long blockSize = 8192L;
     int numBlocks = 8;
-    int repl = 1;
-    mySetup();
+    int stripeLength = 3;
+    mySetup("rs", stripeLength, 3);
 
     // Create an instance of the RaidNode
     Configuration localConf = new Configuration(conf);
     localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
     cnode = RaidNode.createRaidNode(null, localConf);
-
-    Path file = new Path("/user/dhruba/raidtest/file");
     Path destPath = new Path("/destraid/user/dhruba/raidtest");
-    int[][] corrupt = {{0}, {4}, {7}}; // first, last and middle block
+    int[][] corrupt = {{1, 2, 3}, {1, 4, 7}, {3, 6, 7}};
     try {
-      long crc = createTestFilePartialLastBlock(fileSys, file, repl,
-                  numBlocks, blockSize);
-      long length = fileSys.getFileStatus(file).getLen();
-      waitForFileRaided(LOG, fileSys, file, destPath);
-      LocatedBlocks locations = getBlockLocations(file);
-
       for (int i = 0; i < corrupt.length; i++) {
-        int blockNumToCorrupt = corrupt[i][0];
-        LOG.info("Corrupt block " + blockNumToCorrupt + " of file");
-        corruptBlock(file, locations.get(blockNumToCorrupt).getBlock(),
-          NUM_DATANODES, false);
-        validateFile(getRaidFS(), file, length, crc);
-      }
-
-      // Corrupt one more block. This is expected to fail.
-      LOG.info("Corrupt one more block of file");
-      corruptBlock(file, locations.get(1).getBlock(), NUM_DATANODES, false);
-      try {
-        validateFile(getRaidFS(), file, length, crc);
-        fail("Expected exception ChecksumException not thrown!");
-      } catch (org.apache.hadoop.fs.ChecksumException e) {
+        Path file = new Path("/user/dhruba/raidtest/file" + i);
+        corruptBlockAndValidate(
+            file, destPath, corrupt[0], blockSize, numBlocks);
       }
     } catch (Exception e) {
       LOG.info("testRaidDfs Exception " + e +
@@ -288,7 +272,7 @@ public class TestRaidDfs extends TestCas
    * Test DistributedRaidFileSystem.readFully()
    */
   public void testReadFully() throws Exception {
-    mySetup();
+    mySetup("xor", 3, 1);
 
     try {
       Path file = new Path("/user/raid/raidtest/file1");
@@ -328,7 +312,7 @@ public class TestRaidDfs extends TestCas
     long blockSize = 8192L;
     int numBlocks = 8;
     int repl = 1;
-    mySetup();
+    mySetup("xor", 3, 1);
 
     Path file = new Path("/user/dhruba/raidtest/file");
     Path destPath = new Path("/destraid/user/dhruba/raidtest");
@@ -351,6 +335,41 @@ public class TestRaidDfs extends TestCas
       myTearDown();
     }
   }
+  /**
+   * Create a file, corrupt a block in it and ensure that the file can be
+   * read through DistributedRaidFileSystem by XOR code.
+   */
+  public void testRaidDfsXor() throws Exception {
+    LOG.info("Test testRaidDfs started.");
+
+    long blockSize = 8192L;
+    int numBlocks = 8;
+    int stripeLength = 3;
+    mySetup("xor", stripeLength, 1);
+
+    // Create an instance of the RaidNode
+    Configuration localConf = new Configuration(conf);
+    localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+    cnode = RaidNode.createRaidNode(null, localConf);
+
+    Path destPath = new Path("/destraid/user/dhruba/raidtest");
+    int[][] corrupt = {{0}, {4}, {7}}; // first, last and middle block
+    try {
+      for (int i = 0; i < corrupt.length; i++) {
+        Path file = new Path("/user/dhruba/raidtest/" + i);
+        corruptBlockAndValidate(
+            file, destPath, corrupt[0], blockSize, numBlocks);
+      }
+    } catch (Exception e) {
+      LOG.info("testRaidDfs Exception " + e +
+                StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      if (cnode != null) { cnode.stop(); cnode.join(); }
+      myTearDown();
+    }
+    LOG.info("Test testRaidDfs completed.");
+  }
 
   //
   // creates a file and populate it with random data. Returns its crc.