You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by dh...@apache.org on 2009/10/06 09:31:12 UTC

svn commit: r822153 [2/3] - in /hadoop/hdfs/trunk: ./ src/contrib/ src/contrib/raid/ src/contrib/raid/conf/ src/contrib/raid/ivy/ src/contrib/raid/lib/ src/contrib/raid/src/ src/contrib/raid/src/java/ src/contrib/raid/src/java/org/ src/contrib/raid/src...

Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java Tue Oct  6 07:31:11 2009
@@ -0,0 +1,1250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Iterator;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.Set;
+import java.util.HashSet;
+import java.lang.Thread;
+import java.net.InetSocketAddress;
+import org.xml.sax.SAXException;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.hadoop.ipc.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.hadoop.raid.protocol.PolicyInfo;
+import org.apache.hadoop.raid.protocol.PolicyList;
+import org.apache.hadoop.raid.protocol.RaidProtocol;
+import org.apache.hadoop.raid.protocol.RaidProtocol.ReturnStatus;
+
+/**
+ * A {@link RaidNode} that implements 
+ */
+public class RaidNode implements RaidProtocol {
+  public static final Log LOG = LogFactory.getLog( "org.apache.hadoop.raid.RaidNode");
+  public static final long SLEEP_TIME = 10000L; // 10 seconds
+  public static final int DEFAULT_PORT = 60000;
+  public static final int DEFAULT_STRIPE_LENGTH = 5; // default value of stripe length
+  public static final String DEFAULT_RAID_LOCATION = "/raid";
+
+  /** RPC server */
+  private Server server;
+  /** RPC server address */
+  private InetSocketAddress serverAddress = null;
+  /** only used for testing purposes  */
+  private boolean stopRequested = false;
+
+  /** Configuration Manager */
+  private ConfigManager configMgr;
+
+  /** hadoop configuration */
+  private Configuration conf;
+
+  protected boolean initialized;  // Are we initialized?
+  protected volatile boolean running; // Are we running?
+
+  /** Deamon thread to trigger policies */
+  Daemon triggerThread = null;
+
+  /** Deamon thread to delete obsolete parity files */
+  Daemon purgeThread = null;
+
+  /** Do do distributed raiding */
+  boolean isRaidLocal = false;
+  
+  // statistics about RAW hdfs blocks. This counts all replicas of a block.
+  public static class Statistics {
+    long numProcessedBlocks; // total blocks encountered in namespace
+    long processedSize;   // disk space occupied by all blocks
+    long remainingSize;      // total disk space post RAID
+    
+    long numMetaBlocks;      // total blocks in metafile
+    long metaSize;           // total disk space for meta files
+
+    public void clear() {
+    	numProcessedBlocks = 0;
+    	processedSize = 0;
+    	remainingSize = 0;
+    	numMetaBlocks = 0;
+    	metaSize = 0;
+    }
+    public String toString() {
+      long save = processedSize - (remainingSize + metaSize);
+      long savep = 0;
+      if (processedSize > 0) {
+        savep = (save * 100)/processedSize;
+      }
+      String msg = " numProcessedBlocks = " + numProcessedBlocks +
+                   " processedSize = " + processedSize +
+                   " postRaidSize = " + remainingSize +
+                   " numMetaBlocks = " + numMetaBlocks +
+                   " metaSize = " + metaSize +
+                   " %save in raw disk space = " + savep;
+      return msg;
+    }
+  }
+
+  // Startup options
+  static public enum StartupOption{
+    TEST ("-test"),
+    REGULAR ("-regular");
+
+    private String name = null;
+    private StartupOption(String arg) {this.name = arg;}
+    public String getName() {return name;}
+  }
+  
+  /**
+   * Start RaidNode.
+   * <p>
+   * The raid-node can be started with one of the following startup options:
+   * <ul> 
+   * <li>{@link StartupOption#REGULAR REGULAR} - normal raid node startup</li>
+   * </ul>
+   * The option is passed via configuration field: 
+   * <tt>fs.raidnode.startup</tt>
+   * 
+   * The conf will be modified to reflect the actual ports on which 
+   * the RaidNode is up and running if the user passes the port as
+   * <code>zero</code> in the conf.
+   * 
+   * @param conf  confirguration
+   * @throws IOException
+   */
+
+  RaidNode(Configuration conf) throws IOException {
+    try {
+      initialize(conf);
+    } catch (IOException e) {
+      this.stop();
+      throw e;
+    } catch (Exception e) {
+      this.stop();
+      throw new IOException(e);
+    }
+  }
+
+  public long getProtocolVersion(String protocol,
+                                 long clientVersion) throws IOException {
+    if (protocol.equals(RaidProtocol.class.getName())) {
+      return RaidProtocol.versionID;
+    } else {
+      throw new IOException("Unknown protocol to name node: " + protocol);
+    }
+  }
+
+  /**
+   * Wait for service to finish.
+   * (Normally, it runs forever.)
+   */
+  public void join() {
+    try {
+      if (server != null) server.join();
+      if (triggerThread != null) triggerThread.join();
+      if (purgeThread != null) purgeThread.join();
+    } catch (InterruptedException ie) {
+      // do nothing
+    }
+  }
+  
+  /**
+   * Stop all RaidNode threads and wait for all to finish.
+   */
+  public void stop() {
+    if (stopRequested) {
+      return;
+    }
+    stopRequested = true;
+    running = false;
+    if (server != null) server.stop();
+    if (triggerThread != null) triggerThread.interrupt();
+    if (purgeThread != null) purgeThread.interrupt();
+  }
+
+  private static InetSocketAddress getAddress(String address) {
+    return NetUtils.createSocketAddr(address);
+  }
+
+  public static InetSocketAddress getAddress(Configuration conf) {
+    String nodeport = conf.get("raid.server.address");
+    if (nodeport == null) {
+      nodeport = "localhost:" + DEFAULT_PORT;
+    }
+    return getAddress(nodeport);
+  }
+
+
+  private void initialize(Configuration conf) 
+    throws IOException, SAXException, InterruptedException, RaidConfigurationException,
+           ClassNotFoundException, ParserConfigurationException {
+    this.conf = conf;
+    InetSocketAddress socAddr = RaidNode.getAddress(conf);
+    int handlerCount = conf.getInt("fs.raidnode.handler.count", 10);
+
+    isRaidLocal = conf.getBoolean("fs.raidnode.local", false);
+    // read in the configuration
+    configMgr = new ConfigManager(conf);
+
+    // create rpc server 
+    this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(),
+                                handlerCount, false, conf);
+
+    // The rpc-server port can be ephemeral... ensure we have the correct info
+    this.serverAddress = this.server.getListenerAddress();
+    LOG.info("RaidNode up at: " + this.serverAddress);
+
+    initialized = true;
+    running = true;
+    this.server.start(); // start RPC server
+
+    // start the deamon thread to fire polcies appropriately
+    this.triggerThread = new Daemon(new TriggerMonitor());
+    this.triggerThread.start();
+
+    // start the thread that deletes obsolete parity files
+    this.purgeThread = new Daemon(new PurgeMonitor());
+    this.purgeThread.start();
+  }
+
+  /**
+   * Implement RaidProtocol methods
+   */
+
+  /** {@inheritDoc} */
+  public PolicyList[] getAllPolicies() throws IOException {
+    Collection<PolicyList> list = configMgr.getAllPolicies();
+    return list.toArray(new PolicyList[list.size()]);
+  }
+
+  /** {@inheritDoc} */
+  public ReturnStatus recoverFile(String inStr, long corruptOffset) throws IOException {
+
+    LOG.info("Recover File for " + inStr + " for corrupt offset " + corruptOffset);
+    Path inputPath = new Path(inStr);
+    Path srcPath = inputPath.makeQualified(inputPath.getFileSystem(conf));
+    PolicyInfo info = findMatchingPolicy(srcPath);
+    if (info != null) {
+
+      // find stripe length from config
+      int stripeLength = getStripeLength(conf, info);
+
+      // create destination path prefix
+      String destPrefix = getDestinationPath(conf, info);
+      Path destPath = new Path(destPrefix.trim());
+      FileSystem fs = FileSystem.get(destPath.toUri(), conf);
+      destPath = destPath.makeQualified(fs);
+
+      unRaid(conf, srcPath, destPath, stripeLength, corruptOffset);
+    }
+    return ReturnStatus.SUCCESS;
+  }
+
+  /**
+   * Periodically checks to see which policies should be fired.
+   */
+  class TriggerMonitor implements Runnable {
+    /**
+     */
+    public void run() {
+      while (running) {
+        try {
+          doProcess();
+        } catch (Exception e) {
+          LOG.error(StringUtils.stringifyException(e));
+        } finally {
+          LOG.info("Trigger thread continuing to run...");
+        }
+      }
+    }
+
+
+    /**
+     * Keep processing policies.
+     * If the config file has changed, then reload config file and start afresh.
+     */
+    private void doProcess() throws IOException, InterruptedException {
+      PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
+
+      long prevExec = 0;
+      DistRaid dr = null;
+      while (running) {
+
+        boolean reload = configMgr.reloadConfigsIfNecessary();
+        while(!reload && now() < prevExec + configMgr.getPeriodicity()){
+          Thread.sleep(SLEEP_TIME);
+          reload = configMgr.reloadConfigsIfNecessary();
+        }
+
+        prevExec = now();
+        
+        // activate all categories
+        Collection<PolicyList> all = configMgr.getAllPolicies();
+        
+        // sort all policies by reverse lexicographical order. This is needed
+        // to make the nearest policy take precedence.
+        PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
+        Arrays.sort(sorted, lexi);
+
+        if (!isRaidLocal) {
+          dr = new DistRaid(conf);
+        }
+        // paths we have processed so far
+        List<String> processed = new LinkedList<String>();
+        
+        for (PolicyList category : sorted) {
+          for (PolicyInfo info: category.getAll()) {
+
+            long modTimePeriod = 0;
+            short srcReplication = 0;
+            String str = info.getProperty("modTimePeriod");
+            if (str != null) {
+               modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod")); 
+            }
+            str = info.getProperty("srcReplication");
+            if (str != null) {
+               srcReplication = Short.parseShort(info.getProperty("srcReplication")); 
+            }
+
+            LOG.info("Triggering Policy Filter " + info.getName() +
+                     " " + info.getSrcPath());
+            List<FileStatus> filteredPaths = null;
+            try { 
+              filteredPaths = selectFiles(conf, info.getSrcPath(), 
+                                          getDestinationPath(conf, info),
+                                          modTimePeriod,
+                                          srcReplication,
+                                          prevExec);
+            } catch (Exception e) {
+              LOG.info("Exception while invoking filter on policy " + info.getName() +
+                       " srcPath " + info.getSrcPath() + 
+                       " exception " + StringUtils.stringifyException(e));
+              continue;
+            }
+
+            if (filteredPaths == null || filteredPaths.size() == 0) {
+              LOG.info("No filtered paths for policy " + info.getName());
+               continue;
+            }
+
+            // If any of the filtered path has already been accepted 
+            // by a previous policy, then skip it.
+            for (Iterator<FileStatus> iter = filteredPaths.iterator(); iter.hasNext();) {
+              String fs = iter.next().getPath().toString() + "/";
+              for (String p : processed) {
+                if (p.startsWith(fs)) {
+                  iter.remove();
+                  break;
+                }
+              }
+            }
+
+            // Apply the action on accepted paths
+            LOG.info("Triggering Policy Action " + info.getName());
+            try {
+            	if (isRaidLocal){
+            	  doRaid(conf, info, filteredPaths);
+            	}
+            	else{
+            	  //add paths for distributed raiding
+            	  dr.addRaidPaths(info, filteredPaths);
+            	}
+            } catch (Exception e) {
+              LOG.info("Exception while invoking action on policy " + info.getName() +
+                       " srcPath " + info.getSrcPath() + 
+                       " exception " + StringUtils.stringifyException(e));
+              continue;
+            }
+
+            // add these paths to processed paths
+            for (Iterator<FileStatus> iter = filteredPaths.iterator(); iter.hasNext();) {
+              String p = iter.next().getPath().toString() + "/";
+              processed.add(p);
+            }
+          }
+        }
+        processed.clear(); // free up memory references before yielding
+
+        //do the distributed raiding
+        if (!isRaidLocal) {
+          dr.doDistRaid();
+        } 
+      }
+    }
+  }
+
+  /**
+   * Returns the policy that matches the specified path.
+   * The method below finds the first policy that matches an input path. Since different 
+   * policies with different purposes and destinations might be associated with the same input
+   * path, we should be skeptical about the places using the method and we should try to change
+   * the code to avoid it.
+   */
+  private PolicyInfo findMatchingPolicy(Path inpath) throws IOException {
+    PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
+    Collection<PolicyList> all = configMgr.getAllPolicies();
+        
+    // sort all policies by reverse lexicographical order. This is needed
+    // to make the nearest policy take precedence.
+    PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
+    Arrays.sort(sorted, lexi);
+
+    // loop through all categories of policies.
+    for (PolicyList category : sorted) {
+      PolicyInfo first = category.getAll().iterator().next();
+      if (first != null) {
+        Path[] srcPaths = first.getSrcPathExpanded(); // input src paths unglobbed
+        if (srcPaths == null) {
+          continue;
+        }
+
+        for (Path src: srcPaths) {
+          if (inpath.toString().startsWith(src.toString())) {
+            // if the srcpath is a prefix of the specified path
+            // we have a match! 
+            return first;
+          }
+        }
+      }
+    }
+    return null; // no matching policies
+  }
+
+ /**
+  * Returns a list of pathnames that needs raiding.
+  */
+  private List<FileStatus> selectFiles(Configuration conf, Path p, String destPrefix,
+                                       long modTimePeriod, short srcReplication, long now) throws IOException {
+
+    List<FileStatus> returnSet = new LinkedList<FileStatus>();
+
+    // expand destination prefix path
+    Path destp = new Path(destPrefix.trim());
+    FileSystem fs = FileSystem.get(destp.toUri(), conf);
+    destp = destp.makeQualified(fs);
+
+    fs = p.getFileSystem(conf);
+    FileStatus[] gpaths = fs.globStatus(p);
+    if (gpaths != null){
+      for (FileStatus onepath: gpaths) {
+        recurse(fs, conf, destp, onepath, returnSet, modTimePeriod, srcReplication, now);
+      }
+    }
+    return returnSet;
+  }
+
+  /**
+   * Pick files that need to be RAIDed.
+   */
+  private void recurse(FileSystem srcFs,
+                       Configuration conf,
+                       Path destPathPrefix,
+                       FileStatus src,
+                       List<FileStatus> accept,
+                       long modTimePeriod, 
+                       short srcReplication, 
+                       long now) throws IOException {
+    Path path = src.getPath();
+    FileStatus[] files = null;
+    try {
+      files = srcFs.listStatus(path);
+    } catch (java.io.FileNotFoundException e) {
+      // ignore error because the file could have been deleted by an user
+      LOG.info("FileNotFound " + path + " " + StringUtils.stringifyException(e));
+    } catch (IOException e) {
+      throw e;
+    }
+
+    // If the modTime of the raid file is later than the modtime of the
+    // src file and the src file has not been modified
+    // recently, then that file is a candidate for RAID.
+
+    if (!src.isDir()) {      // a file
+
+      // if the source file has fewer than or equal to 2 blocks, then no need to RAID
+      long blockSize = src.getBlockSize();
+      if (2 * blockSize >= src.getLen()) {
+        return;
+      }
+
+      // If the replication factor of the source file is already below what is
+      // expected by RAID, then ignore it.
+      if (src.getReplication() < srcReplication) {
+        return;
+      }
+
+      // check if destination path already exists. If it does and it's modification time
+      // does not match the modTime of the source file, then recalculate RAID
+      boolean add = false;
+      try {
+        Path outpath =  new Path(destPathPrefix, makeRelative(path));
+        FileSystem outFs = outpath.getFileSystem(conf);
+        FileStatus ostat = outFs.getFileStatus(outpath);
+        if (ostat.getModificationTime() != src.getModificationTime() &&
+            src.getModificationTime() + modTimePeriod < now) {
+          add = true;
+         }
+      } catch (java.io.FileNotFoundException e) {
+        add = true; // destination file does not exist
+      }
+
+      if (add) {
+        accept.add(src);
+      }
+      return;
+
+    } else if (files != null) {
+      for (FileStatus one:files) {
+        recurse(srcFs, conf, destPathPrefix, one, accept, modTimePeriod, srcReplication, now);
+      }
+    }
+  }
+
+
+  /**
+   * RAID a list of files.
+   */
+  void doRaid(Configuration conf, PolicyInfo info, List<FileStatus> paths)
+      throws IOException {
+    int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
+    int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
+    int stripeLength = getStripeLength(conf, info);
+    String destPrefix = getDestinationPath(conf, info);
+    String simulate = info.getProperty("simulate");
+    boolean doSimulate = simulate == null ? false : Boolean
+        .parseBoolean(simulate);
+
+    Statistics statistics = new Statistics();
+    int count = 0;
+
+    Path p = new Path(destPrefix.trim());
+    FileSystem fs = FileSystem.get(p.toUri(), conf);
+    p = p.makeQualified(fs);
+
+    for (FileStatus s : paths) {
+      doRaid(conf, s, p, statistics, null, doSimulate, targetRepl, metaRepl,
+          stripeLength);
+      if (count % 1000 == 0) {
+        LOG.info("RAID statistics " + statistics.toString());
+      }
+      count++;
+    }
+    LOG.info("RAID statistics " + statistics.toString());
+  }
+
+  
+  /**
+   * RAID an individual file
+   */
+
+  static public void doRaid(Configuration conf, PolicyInfo info,
+      FileStatus src, Statistics statistics, Reporter reporter) throws IOException {
+    int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
+    int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
+    int stripeLength = getStripeLength(conf, info);
+    String destPrefix = getDestinationPath(conf, info);
+    String simulate = info.getProperty("simulate");
+    boolean doSimulate = simulate == null ? false : Boolean
+        .parseBoolean(simulate);
+
+    int count = 0;
+
+    Path p = new Path(destPrefix.trim());
+    FileSystem fs = FileSystem.get(p.toUri(), conf);
+    p = p.makeQualified(fs);
+
+    doRaid(conf, src, p, statistics, reporter, doSimulate, targetRepl, metaRepl,
+        stripeLength);
+  }
+  
+  
+  /**
+   * RAID an individual file
+   */
+  static private void doRaid(Configuration conf, FileStatus stat, Path destPath,
+                      Statistics statistics, Reporter reporter, boolean doSimulate,
+                      int targetRepl, int metaRepl, int stripeLength) 
+    throws IOException {
+    Path p = stat.getPath();
+    FileSystem srcFs = p.getFileSystem(conf);
+
+    // extract block locations from File system
+    BlockLocation[] locations = srcFs.getFileBlockLocations(stat, 0, stat.getLen());
+    
+    // if the file has fewer than 2 blocks, then nothing to do
+    if (locations.length <= 2) {
+      return;
+    }
+
+    // add up the raw disk space occupied by this file
+    long diskSpace = 0;
+    for (BlockLocation l: locations) {
+      diskSpace += (l.getLength() * stat.getReplication());
+    }
+    statistics.numProcessedBlocks += locations.length;
+    statistics.processedSize += diskSpace;
+
+    // generate parity file
+    generateParityFile(conf, stat, reporter, srcFs, destPath, locations, metaRepl, stripeLength);
+
+    // reduce the replication factor of the source file
+    if (!doSimulate) {
+      if (srcFs.setReplication(p, (short)targetRepl) == false) {
+        LOG.info("Error in reducing relication factor of file " + p + " to " + targetRepl);
+        statistics.remainingSize += diskSpace;  // no change in disk space usage
+        return;
+      }
+    }
+
+    diskSpace = 0;
+    for (BlockLocation l: locations) {
+      diskSpace += (l.getLength() * targetRepl);
+    }
+    statistics.remainingSize += diskSpace;
+
+    // the metafile will have this many number of blocks
+    int numMeta = locations.length / stripeLength;
+    if (locations.length % stripeLength != 0) {
+      numMeta++;
+    }
+
+    // we create numMeta for every file. This metablock has metaRepl # replicas.
+    // the last block of the metafile might not be completely filled up, but we
+    // ignore that for now.
+    statistics.numMetaBlocks += (numMeta * metaRepl);
+    statistics.metaSize += (numMeta * metaRepl * stat.getBlockSize());
+  }
+
+  /**
+   * Create the parity file.
+   */
+  static private void generateParityFile(Configuration conf, FileStatus stat,
+                                  Reporter reporter,
+                                  FileSystem inFs,
+                                  Path destPathPrefix, BlockLocation[] locations,
+                                  int metaRepl, int stripeLength) throws IOException {
+
+    // two buffers for generating parity
+    int bufSize = 5 * 1024 * 1024; // 5 MB
+    byte[] bufs = new byte[bufSize];
+    byte[] xor = new byte[bufSize];
+    byte zero = 0;
+
+    Path inpath = stat.getPath();
+    long blockSize = stat.getBlockSize();
+    long fileSize = stat.getLen();
+
+    // create output tmp path
+    Path outpath =  new Path(destPathPrefix, makeRelative(inpath));
+    Path tmppath =  new Path(outpath + ".tmp");
+    FileSystem outFs = outpath.getFileSystem(conf);
+
+    // if the parity file is already upto-date, then nothing to do
+    try {
+      FileStatus stmp = outFs.getFileStatus(outpath);
+      if (stmp.getModificationTime() == stat.getModificationTime()) {
+        LOG.info("Parity file for " + inpath + "(" + locations.length + ") is " + outpath +
+                 " already upto-date. Nothing more to do.");
+      }
+    } catch (IOException e) {
+      // ignore errors because the raid file might not exist yet.
+    } 
+
+    LOG.info("Parity file for " + inpath + "(" + locations.length + ") is " + outpath);
+    FSDataOutputStream out = outFs.create(tmppath, 
+                                          true, 
+                                          conf.getInt("io.file.buffer.size", 64 * 1024), 
+                                          (short)metaRepl, 
+                                          blockSize);
+
+    try {
+
+      // loop once for every stripe length
+      for (int startBlock = 0; startBlock < locations.length;) {
+
+        // report progress to Map-reduce framework
+        if (reporter != null) {
+          reporter.progress();
+        }
+        int blocksLeft = locations.length - startBlock;
+        int stripe = Math.min(stripeLength, blocksLeft);
+        LOG.info(" startBlock " + startBlock + " stripe " + stripe);
+
+        // open a new file descriptor for each block in this stripe.
+        // make each fd point to the beginning of each block in this stripe.
+        FSDataInputStream[] ins = new FSDataInputStream[stripe];
+        for (int i = 0; i < stripe; i++) {
+          ins[i] = inFs.open(inpath, bufSize);
+          ins[i].seek(blockSize * (startBlock + i));
+        }
+
+        generateParity(ins,out,blockSize,bufs,xor);
+        
+        // close input file handles
+        for (int i = 0; i < ins.length; i++) {
+          ins[i].close();
+        }
+
+        // increment startBlock to point to the first block to be processed
+        // in the next iteration
+        startBlock += stripe;
+      }
+      out.close();
+      out = null;
+
+      // rename tmppath to the real parity filename
+      if (!outFs.rename(tmppath, outpath)) {
+        String msg = "Unable to rename tmp file " + tmppath + " to " + outpath;
+        LOG.warn(msg);
+        throw new IOException (msg);
+      }
+    } finally {
+      // remove the tmp file if it still exists
+      outFs.delete(tmppath, false);  
+    }
+
+    // set the modification time of the RAID file. This is done so that the modTime of the
+    // RAID file reflects that contents of the source file that it has RAIDed. This should
+    // also work for files that are being appended to. This is necessary because the time on
+    // on the destination namenode may not be synchronised with the timestamp of the 
+    // source namenode.
+    outFs.setTimes(outpath, stat.getModificationTime(), -1);
+
+    FileStatus outstat = outFs.getFileStatus(outpath);
+    LOG.info("Source file " + inpath + " of size " + fileSize +
+             " Parity file " + outpath + " of size " + outstat.getLen() +
+             " src mtime " + stat.getModificationTime()  +
+             " parity mtime " + outstat.getModificationTime());
+  }
+
+  private static int readInputUntilEnd(FSDataInputStream ins, byte[] bufs, int toRead) 
+      throws IOException {
+
+    int tread = 0;
+    
+    while (tread < toRead) {
+      int read = ins.read(bufs, tread, toRead - tread);
+      if (read == -1) {
+        return tread;
+      } else {
+        tread += read;
+      }
+    }
+    
+    return tread;
+  }
+  
+  private static void generateParity(FSDataInputStream[] ins, FSDataOutputStream fout, 
+      long parityBlockSize, byte[] bufs, byte[] xor) throws IOException {
+    
+    int bufSize;
+    if ((bufs == null) || (bufs.length == 0)){
+      bufSize = 5 * 1024 * 1024; // 5 MB
+      bufs = new byte[bufSize];
+    } else {
+      bufSize = bufs.length;
+    }
+    if ((xor == null) || (xor.length != bufs.length)){
+      xor = new byte[bufSize];
+    }
+
+    int xorlen = 0;
+      
+    // this loop processes all good blocks in selected stripe
+    long remaining = parityBlockSize;
+    
+    while (remaining > 0) {
+      int toRead = (int)Math.min(remaining, bufSize);
+
+      if (ins.length > 0) {
+        xorlen = readInputUntilEnd(ins[0], xor, toRead);
+      }
+
+      // read all remaining blocks and xor them into the buffer
+      for (int i = 1; i < ins.length; i++) {
+        
+        int actualRead = readInputUntilEnd(ins[i], bufs, toRead);
+        
+        int j;
+        int xorlimit = (int) Math.min(xorlen,actualRead);
+        for (j = 0; j < xorlimit; j++) {
+          xor[j] ^= bufs[j];
+        }
+        if ( actualRead > xorlen ){
+          for (; j < actualRead; j++) {
+            xor[j] = bufs[j];
+          }
+          xorlen = actualRead;
+        }
+        
+      }
+
+      if (xorlen < toRead) {
+        Arrays.fill(bufs, xorlen, toRead, (byte) 0);
+      }
+      
+      // write this to the tmp file
+      fout.write(xor, 0, toRead);
+      remaining -= toRead;
+    }
+  
+  }
+  
+  /**
+   * Extract a good block from the parity block. This assumes that the corruption
+   * is in the main file and the parity file is always good.
+   */
+  public static Path unRaid(Configuration conf, Path srcPath, Path destPathPrefix, 
+                            int stripeLength, long corruptOffset) throws IOException {
+
+    // extract block locations, size etc from source file
+    Random rand = new Random();
+    FileSystem srcFs = srcPath.getFileSystem(conf);
+    FileStatus srcStat = srcFs.getFileStatus(srcPath);
+    BlockLocation[] locations = srcFs.getFileBlockLocations(srcStat, 0, srcStat.getLen());
+    long blockSize = srcStat.getBlockSize();
+    long fileSize = srcStat.getLen();
+    int totalBlocks = locations.length;
+
+    // find the stripe number where the corrupted offset lies
+    long snum = corruptOffset / (stripeLength * blockSize);
+    long startOffset = snum * stripeLength * blockSize;
+    long corruptBlockInStripe = (corruptOffset - startOffset)/blockSize;
+    long corruptBlockSize = Math.min(blockSize, fileSize - startOffset);
+
+    LOG.info("Start offset of relevent stripe = " + startOffset +
+             " corruptBlockInStripe " + corruptBlockInStripe);
+
+    // open file descriptors to read all good blocks of the file
+    FSDataInputStream[] instmp = new FSDataInputStream[stripeLength];
+    int  numLength = 0;
+    for (int i = 0; i < stripeLength; i++) {
+      if (i == corruptBlockInStripe) {
+        continue;  // do not open corrupt block
+      }
+      if (startOffset + i * blockSize >= fileSize) {
+        LOG.info("Stop offset of relevent stripe = " + 
+                  startOffset + i * blockSize);
+        break;
+      }
+      instmp[numLength] = srcFs.open(srcPath);
+      instmp[numLength].seek(startOffset + i * blockSize);
+      numLength++;
+    }
+
+    // create array of inputstream, allocate one extra slot for 
+    // parity file. numLength could be smaller than stripeLength
+    // if we are processing the last partial stripe on a file.
+    numLength += 1;
+    FSDataInputStream[] ins = new FSDataInputStream[numLength];
+    for (int i = 0; i < numLength-1; i++) {
+      ins[i] = instmp[i];
+    }
+    LOG.info("Decompose a total of " + numLength + " blocks.");
+
+    // open and seek to the appropriate offset in parity file.
+    Path parityFile =  new Path(destPathPrefix, makeRelative(srcPath));
+    FileSystem parityFs = parityFile.getFileSystem(conf);
+    LOG.info("Parity file for " + srcPath + " is " + parityFile);
+    ins[numLength-1] = parityFs.open(parityFile);
+    ins[numLength-1].seek(snum * blockSize);
+    LOG.info("Parity file " + parityFile +
+             " seeking to relevent block at offset " + 
+             ins[numLength-1].getPos());
+
+    // create a temporary filename in the source filesystem
+    // do not overwrite an existing tmp file. Make it fail for now.
+    // We need to generate a unique name for this tmp file later on.
+    Path tmpFile = null;
+    FSDataOutputStream fout = null;
+    int retry = 5;
+    try {
+      tmpFile = new Path("/tmp/dhruba/" + rand.nextInt());
+      fout = parityFs.create(tmpFile, false);
+    } catch (IOException e) {
+      if (retry-- <= 0) {
+        LOG.info("Unable to create temporary file " + tmpFile +
+                 " Aborting....");
+        throw e; 
+      }
+      LOG.info("Unable to create temporary file " + tmpFile +
+               "Retrying....");
+    }
+    LOG.info("Created recovered block file " + tmpFile);
+
+    // buffers for generating parity bits
+    int bufSize = 5 * 1024 * 1024; // 5 MB
+    byte[] bufs = new byte[bufSize];
+    byte[] xor = new byte[bufSize];
+   
+    generateParity(ins,fout,corruptBlockSize,bufs,xor);
+    
+    // close all files
+    fout.close();
+    for (int i = 0; i < ins.length; i++) {
+      ins[i].close();
+    }
+
+    // Now, reopen the source file and the recovered block file
+    // and copy all relevant data to new file
+    Path recoveredPath =  new Path(destPathPrefix, makeRelative(srcPath));
+    recoveredPath = new Path(recoveredPath + ".recovered");
+    LOG.info("Creating recovered file " + recoveredPath);
+
+    FSDataInputStream sin = srcFs.open(srcPath);
+    FSDataOutputStream out = parityFs.create(recoveredPath, false, 
+                                             conf.getInt("io.file.buffer.size", 64 * 1024),
+                                             srcStat.getReplication(), 
+                                             srcStat.getBlockSize());
+
+    FSDataInputStream bin = parityFs.open(tmpFile);
+    long recoveredSize = 0;
+
+    // copy all the good blocks (upto the corruption)
+    // from source file to output file
+    long remaining = corruptOffset / blockSize * blockSize;
+    while (remaining > 0) {
+      int toRead = (int)Math.min(remaining, bufSize);
+      sin.readFully(bufs, 0, toRead);
+      out.write(bufs, 0, toRead);
+      remaining -= toRead;
+      recoveredSize += toRead;
+    }
+    LOG.info("Copied upto " + recoveredSize + " from src file. ");
+
+    // copy recovered block to output file
+    remaining = corruptBlockSize;
+    while (recoveredSize < fileSize &&
+           remaining > 0) {
+      int toRead = (int)Math.min(remaining, bufSize);
+      bin.readFully(bufs, 0, toRead);
+      out.write(bufs, 0, toRead);
+      remaining -= toRead;
+      recoveredSize += toRead;
+    }
+    LOG.info("Copied upto " + recoveredSize + " from recovered-block file. ");
+
+    // skip bad block in src file
+    if (recoveredSize < fileSize) {
+      sin.seek(sin.getPos() + corruptBlockSize); 
+    }
+
+    // copy remaining good data from src file to output file
+    while (recoveredSize < fileSize) {
+      int toRead = (int)Math.min(fileSize - recoveredSize, bufSize);
+      sin.readFully(bufs, 0, toRead);
+      out.write(bufs, 0, toRead);
+      recoveredSize += toRead;
+    }
+    out.close();
+    LOG.info("Completed writing " + recoveredSize + " bytes into " +
+             recoveredPath);
+              
+    sin.close();
+    bin.close();
+
+    // delete the temporary block file that was created.
+    parityFs.delete(tmpFile, false);
+    LOG.info("Deleted temporary file " + tmpFile);
+
+    // copy the meta information from source path to the newly created
+    // recovered path
+    copyMetaInformation(parityFs, srcStat, recoveredPath);
+
+    return recoveredPath;
+  }
+
+  /**
+   * Periodically delete orphaned parity files.
+   */
+  class PurgeMonitor implements Runnable {
+    /**
+     */
+    public void run() {
+      while (running) {
+        try {
+          doPurge();
+        } catch (Exception e) {
+          LOG.error(StringUtils.stringifyException(e));
+        } finally {
+          LOG.info("Purge parity files thread continuing to run...");
+        }
+      }
+    }
+
+    /**
+     * Delete orphaned files. The reason this is done by a separate thread 
+     * is to not burden the TriggerMonitor with scanning the 
+     * destination directories.
+     */
+    private void doPurge() throws IOException, InterruptedException {
+      PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
+
+      long prevExec = 0;
+      while (running) {
+
+        // The config may be reloaded by the TriggerMonitor. 
+        // This thread uses whatever config is currently active.
+        while(now() < prevExec + configMgr.getPeriodicity()){
+          Thread.sleep(SLEEP_TIME);
+        }
+
+        prevExec = now();
+        
+        // fetch all categories
+        Collection<PolicyList> all = configMgr.getAllPolicies();
+        
+        // sort all policies by reverse lexicographical order. This is 
+        // needed to make the nearest policy take precedence.
+        PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
+        Arrays.sort(sorted, lexi);
+
+        // paths we have processed so far
+        Set<Path> processed = new HashSet<Path>();
+        
+        for (PolicyList category : sorted) {
+          for (PolicyInfo info: category.getAll()) {
+
+            try {
+              // expand destination prefix path
+              String destinationPrefix = getDestinationPath(conf, info);
+              Path destp = new Path(destinationPrefix.trim());
+              FileSystem destFs = FileSystem.get(destp.toUri(), conf);
+              destp = destp.makeQualified(destFs);
+              destinationPrefix = destp.toUri().getPath(); // strip host:port
+              destp = new Path(destp, makeRelative(info.getSrcPath()));
+
+              // if this destination path has already been processed as part
+              // of another policy, then nothing more to do
+              if (processed.contains(destp)) {
+                LOG.info("Obsolete parity files for policy " + 
+                        info.getName() + " has already been procesed.");
+                continue;
+              }
+              LOG.info("Purging obsolete parity files for policy " + 
+                        info.getName() + " " + destp);
+
+              FileSystem srcFs = info.getSrcPath().getFileSystem(conf);
+              FileStatus stat = destFs.getFileStatus(destp);
+              if (stat != null) {
+                recursePurge(srcFs, destFs, destinationPrefix, stat);
+              }
+
+              // this destination path has already been processed
+              processed.add(destp);
+
+            } catch (Exception e) {
+              LOG.warn("Ignoring Exception while processing policy " + 
+                       info.getName() + " " + 
+                       StringUtils.stringifyException(e));
+            }
+          }
+        }
+      }
+    }
+
+    /**
+     * The destPrefix is the absolute pathname of the destinationPath
+     * specified in the policy (without the host:port)
+     */ 
+    private void recursePurge(FileSystem srcFs, FileSystem destFs,
+                              String destPrefix, FileStatus dest) 
+      throws IOException {
+
+      Path destPath = dest.getPath(); // pathname, no host:port
+      String destStr = destPath.toUri().getPath();
+      LOG.debug("Checking " + destPath + " prefix " + destPrefix);
+
+      // Verify the destPrefix is a prefix of the destPath
+      if (!destStr.startsWith(destPrefix)) {
+        LOG.error("Destination path " + destStr + " should have " + 
+                  destPrefix + " as its prefix.");
+        return;
+      }
+      String src = destStr.replaceFirst(destPrefix, "");
+      
+      // if the source path does not exist, then delete the 
+      // destination path as well
+      Path srcPath = new Path(src);
+      if (!srcFs.exists(srcPath)) {
+        boolean done = destFs.delete(destPath, false);
+        if (done) {
+          LOG.info("Purged path " + destPath );
+        } else {
+          LOG.info("Unable to purge path " + destPath );
+        }
+        return;
+      }
+      if (!dest.isDir()) {
+        return;
+      }
+      FileStatus[] files = null;
+      files = destFs.listStatus(destPath);
+      if (files != null) {
+        for (FileStatus one:files) {
+          recursePurge(srcFs, destFs, destPrefix, one);
+        }
+      }
+    } 
+  }
+
+  /**
+   * If the config file has an entry for hdfs.raid.locations, then that overrides
+   * destination path specified in the raid policy file
+   */
+  static private String getDestinationPath(Configuration conf, PolicyInfo info) {
+    String locs = conf.get("hdfs.raid.locations"); 
+    if (locs != null) {
+      return locs;
+    }
+    locs = info.getDestinationPath();
+    if (locs == null) {
+      return DEFAULT_RAID_LOCATION;
+    }
+    return locs;
+  }
+
+  /**
+   * If the config file has an entry for hdfs.raid.stripeLength, then use that
+   * specified in the raid policy file
+   */
+  static private int getStripeLength(Configuration conf, PolicyInfo info)
+    throws IOException {
+    int len = conf.getInt("hdfs.raid.stripeLength", 0); 
+    if (len != 0) {
+      return len;
+    }
+    String str = info.getProperty("stripeLength");
+    if (str == null) {
+      String msg = "hdfs.raid.stripeLength is not defined." +
+                   " Using a default " + DEFAULT_STRIPE_LENGTH;
+      LOG.info(msg);
+      return DEFAULT_STRIPE_LENGTH;
+    }
+    return Integer.parseInt(str);
+  }
+
+  /**
+   * Copy the file owner, modtime, etc from srcPath to the recovered Path.
+   * It is possiible that we might have to retrieve file persmissions,
+   * quotas, etc too in future.
+   */
+  static private void copyMetaInformation(FileSystem fs, FileStatus stat, 
+                                          Path recoveredPath) 
+    throws IOException {
+    fs.setOwner(recoveredPath, stat.getOwner(), stat.getGroup());
+    fs.setPermission(recoveredPath, stat.getPermission());
+    fs.setTimes(recoveredPath, stat.getModificationTime(), stat.getAccessTime());
+  }
+
+  /**
+   * Returns current time.
+   */
+  static long now() {
+    return System.currentTimeMillis();
+  }
+
+  /**                       
+   * Make an absolute path relative by stripping the leading /
+   */   
+  static private Path makeRelative(Path path) {
+    if (!path.isAbsolute()) {
+      return path;
+    }          
+    String p = path.toUri().getPath();
+    String relative = p.substring(1, p.length());
+    return new Path(relative);
+  } 
+
+  private static void printUsage() {
+    System.err.println("Usage: java RaidNode ");
+  }
+
+  private static StartupOption parseArguments(String args[]) {
+    int argsLen = (args == null) ? 0 : args.length;
+    StartupOption startOpt = StartupOption.REGULAR;
+    for(int i=0; i < argsLen; i++) {
+      String cmd = args[i]; // We have to parse command line args in future.
+    }
+    return startOpt;
+  }
+
+
+  /**
+   * Convert command line options to configuration parameters
+   */
+  private static void setStartupOption(Configuration conf, StartupOption opt) {
+    conf.set("fs.raidnode.startup", opt.toString());
+  }
+
+  /**
+   * Create an instance of the RaidNode 
+   */
+  public static RaidNode createRaidNode(String argv[],
+                                        Configuration conf) throws IOException {
+    if (conf == null) {
+      conf = new Configuration();
+    }
+    StartupOption startOpt = parseArguments(argv);
+    if (startOpt == null) {
+      printUsage();
+      return null;
+    }
+    setStartupOption(conf, startOpt);
+    RaidNode node = new RaidNode(conf);
+    return node;
+  }
+
+
+  /**
+   */
+  public static void main(String argv[]) throws Exception {
+    try {
+      StringUtils.startupShutdownMessage(RaidNode.class, argv, LOG);
+      RaidNode raid = createRaidNode(argv, null);
+      if (raid != null) {
+        raid.join();
+      }
+    } catch (Throwable e) {
+      LOG.error(StringUtils.stringifyException(e));
+      System.exit(-1);
+    }
+  }
+
+  
+
+}

Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java Tue Oct  6 07:31:11 2009
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+import java.net.InetSocketAddress;
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.ipc.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.raid.protocol.PolicyInfo;
+import org.apache.hadoop.raid.protocol.PolicyList;
+import org.apache.hadoop.raid.protocol.RaidProtocol;
+import org.apache.hadoop.raid.protocol.RaidProtocol.ReturnStatus;
+
+/**
+ * A {@link RaidShell} that allows browsing configured raid policies.
+ */
+public class RaidShell extends Configured implements Tool {
+  public static final Log LOG = LogFactory.getLog( "org.apache.hadoop.RaidShell");
+  public RaidProtocol raidnode;
+  final RaidProtocol rpcRaidnode;
+  private UnixUserGroupInformation ugi;
+  volatile boolean clientRunning = true;
+  private Configuration conf;
+
+  /**
+   * Start RaidShell.
+   * <p>
+   * The RaidShell connects to the specified RaidNode and performs basic
+   * configuration options.
+   * @throws IOException
+   */
+  public RaidShell() throws IOException {
+    this(new Configuration());
+  }
+
+  /**
+   * The RaidShell connects to the specified RaidNode and performs basic
+   * configuration options.
+   * @param conf The Hadoop configuration
+   * @throws IOException
+   */
+  public RaidShell(Configuration conf) throws IOException {
+    super(conf);
+    try {
+      this.ugi = UnixUserGroupInformation.login(conf, true);
+    } catch (LoginException e) {
+      throw (IOException)(new IOException().initCause(e));
+    }
+
+    this.rpcRaidnode = createRPCRaidnode(RaidNode.getAddress(conf), conf, ugi);
+    this.raidnode = createRaidnode(rpcRaidnode);
+  }
+
+  public static RaidProtocol createRaidnode(Configuration conf) throws IOException {
+    return createRaidnode(RaidNode.getAddress(conf), conf);
+  }
+
+  public static RaidProtocol createRaidnode(InetSocketAddress raidNodeAddr,
+      Configuration conf) throws IOException {
+    try {
+      return createRaidnode(createRPCRaidnode(raidNodeAddr, conf,
+        UnixUserGroupInformation.login(conf, true)));
+    } catch (LoginException e) {
+      throw (IOException)(new IOException().initCause(e));
+    }
+  }
+
+  private static RaidProtocol createRPCRaidnode(InetSocketAddress raidNodeAddr,
+      Configuration conf, UnixUserGroupInformation ugi)
+    throws IOException {
+    LOG.info("RaidShell connecting to " + raidNodeAddr);
+    return (RaidProtocol)RPC.getProxy(RaidProtocol.class,
+        RaidProtocol.versionID, raidNodeAddr, ugi, conf,
+        NetUtils.getSocketFactory(conf, RaidProtocol.class));
+  }
+
+  private static RaidProtocol createRaidnode(RaidProtocol rpcRaidnode)
+    throws IOException {
+    RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+        5, 5000, TimeUnit.MILLISECONDS);
+
+    Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap =
+      new HashMap<Class<? extends Exception>, RetryPolicy>();
+
+    Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
+      new HashMap<Class<? extends Exception>, RetryPolicy>();
+    exceptionToPolicyMap.put(RemoteException.class,
+        RetryPolicies.retryByRemoteException(
+            RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
+    RetryPolicy methodPolicy = RetryPolicies.retryByException(
+        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+    Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
+
+    methodNameToPolicyMap.put("create", methodPolicy);
+
+    return (RaidProtocol) RetryProxy.create(RaidProtocol.class,
+        rpcRaidnode, methodNameToPolicyMap);
+  }
+
+  private void checkOpen() throws IOException {
+    if (!clientRunning) {
+      IOException result = new IOException("RaidNode closed");
+      throw result;
+    }
+  }
+
+  /**
+   * Close the connection to the raidNode.
+   */
+  public synchronized void close() throws IOException {
+    if(clientRunning) {
+      clientRunning = false;
+      RPC.stopProxy(rpcRaidnode);
+    }
+  }
+
+  /**
+   * Displays format of commands.
+   */
+  private static void printUsage(String cmd) {
+    String prefix = "Usage: java " + RaidShell.class.getSimpleName();
+    if ("-showConfig".equals(cmd)) {
+      System.err.println("Usage: java RaidShell" + 
+                         " [-showConfig]"); 
+    } else if ("-recover".equals(cmd)) {
+      System.err.println("Usage: java CronShell" +
+                         " [-recover srcPath1 corruptOffset]");
+    } else {
+      System.err.println("Usage: java RaidShell");
+      System.err.println("           [-showConfig ]");
+      System.err.println("           [-help [cmd]]");
+      System.err.println("           [-recover srcPath1 corruptOffset]");
+      System.err.println();
+      ToolRunner.printGenericCommandUsage(System.err);
+    }
+  }
+
+  /**
+   * run
+   */
+  public int run(String argv[]) throws Exception {
+
+    if (argv.length < 1) {
+      printUsage("");
+      return -1;
+    }
+
+    int exitCode = -1;
+    int i = 0;
+    String cmd = argv[i++];
+    //
+    // verify that we have enough command line parameters
+    //
+    if ("-showConfig".equals(cmd)) {
+      if (argv.length < 1) {
+        printUsage(cmd);
+        return exitCode;
+      }
+    } else if ("-recover".equals(cmd)) {
+      if (argv.length < 3) {
+        printUsage(cmd);
+        return exitCode;
+      }
+    }
+
+    try {
+      if ("-showConfig".equals(cmd)) {
+        exitCode = showConfig(cmd, argv, i);
+      } else if ("-recover".equals(cmd)) {
+        exitCode = recover(cmd, argv, i);
+      } else {
+        exitCode = -1;
+        System.err.println(cmd.substring(1) + ": Unknown command");
+        printUsage("");
+      }
+    } catch (IllegalArgumentException arge) {
+      exitCode = -1;
+      System.err.println(cmd.substring(1) + ": " + arge.getLocalizedMessage());
+      printUsage(cmd);
+    } catch (RemoteException e) {
+      //
+      // This is a error returned by raidnode server. Print
+      // out the first line of the error mesage, ignore the stack trace.
+      exitCode = -1;
+      try {
+        String[] content;
+        content = e.getLocalizedMessage().split("\n");
+        System.err.println(cmd.substring(1) + ": " +
+                           content[0]);
+      } catch (Exception ex) {
+        System.err.println(cmd.substring(1) + ": " +
+                           ex.getLocalizedMessage());
+      }
+    } catch (IOException e) {
+      //
+      // IO exception encountered locally.
+      // 
+      exitCode = -1;
+      System.err.println(cmd.substring(1) + ": " +
+                         e.getLocalizedMessage());
+    } catch (Exception re) {
+      exitCode = -1;
+      System.err.println(cmd.substring(1) + ": " + re.getLocalizedMessage());
+    } finally {
+    }
+    return exitCode;
+  }
+
+  /**
+   * Apply operation specified by 'cmd' on all parameters
+   * starting from argv[startindex].
+   */
+  private int showConfig(String cmd, String argv[], int startindex) throws IOException {
+    int exitCode = 0;
+    int i = startindex;
+    PolicyList[] all = raidnode.getAllPolicies();
+    for (PolicyList list: all) {
+      for (PolicyInfo p : list.getAll()) {
+        System.out.println(p);
+      }
+    }
+    return exitCode;
+  }
+
+  /**
+   * Recovers the specified path from the parity file
+   */
+  public int recover(String cmd, String argv[], int startindex)
+    throws IOException {
+    int exitCode = 0;
+    String[] paths = new String[argv.length - startindex];
+    for (int i = startindex; i < argv.length; i = i + 2) {
+      String path = argv[i];
+      long corruptOffset = Long.parseLong(argv[i+1]);
+      LOG.info("RaidShell recoverFile for " + path + " corruptOffset " + corruptOffset);
+      raidnode.recoverFile(path, corruptOffset);
+    }
+    return 0;
+  }
+
+  /**
+   * main() has some simple utility methods
+   */
+  public static void main(String argv[]) throws Exception {
+    RaidShell shell = null;
+    try {
+      shell = new RaidShell();
+    } catch (RPC.VersionMismatch v) {
+      System.err.println("Version Mismatch between client and server" +
+                         "... command aborted.");
+      System.exit(-1);
+    } catch (IOException e) {
+      System.err.println("Bad connection to RaidNode. command aborted.");
+      System.exit(-1);
+    }
+
+    int res;
+    try {
+      res = ToolRunner.run(shell, argv);
+    } finally {
+      shell.close();
+    }
+    System.exit(res);
+  }
+}

Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java Tue Oct  6 07:31:11 2009
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid.protocol;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+import java.util.Enumeration;
+import java.lang.Math;
+import java.lang.Class;
+import java.text.SimpleDateFormat;
+import java.util.StringTokenizer;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.raid.protocol.RaidProtocol.ReturnStatus;
+
+/**
+ * Maintains information about one policy
+ */
+public class PolicyInfo implements Writable {
+  public static final Log LOG = LogFactory.getLog(
+    "org.apache.hadoop.raid.protocol.PolicyInfo");
+  protected static final SimpleDateFormat dateFormat =
+    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+  private Path srcPath;            // the specified src path
+  private String policyName;       // name of policy
+  private String destinationPath;  // A destination path for this policy
+  private String description;      // A verbose description of this policy
+  private Configuration conf;      // Hadoop configuration
+
+  private Properties properties;   // Policy-dependent properties
+
+  private ReentrantReadWriteLock plock; // protects policy operations.
+  
+  /**
+   * Create the empty object
+   */
+  public PolicyInfo() {
+    this.conf = null;
+    this.policyName = "";
+    this.description = "";
+    this.srcPath = null;
+    this.properties = new Properties();
+    this.plock = new ReentrantReadWriteLock();
+  }
+
+  /**
+   * Create the metadata that describes a policy
+   */
+  public PolicyInfo(String  policyName, Configuration conf) {
+    this.conf = conf;
+    this.policyName = policyName;
+    this.description = "";
+    this.srcPath = null;
+    this.properties = new Properties();
+    this.plock = new ReentrantReadWriteLock();
+  }
+
+  /**
+   * Sets the input path on which this policy has to be applied
+   */
+  public void setSrcPath(String in) throws IOException {
+    srcPath = new Path(in);
+    srcPath = srcPath.makeQualified(srcPath.getFileSystem(conf));
+  }
+
+  /**
+   * Set the destination path of this policy.
+   */
+  public void setDestinationPath(String des) {
+    this.destinationPath = des;
+  }
+
+  /**
+   * Set the description of this policy.
+   */
+  public void setDescription(String des) {
+    this.description = des;
+  }
+
+  /**
+   * Sets an internal property.
+   * @param name property name.
+   * @param value property value.
+   */
+  public void setProperty(String name, String value) {
+    properties.setProperty(name, value);
+  }
+  
+  /**
+   * Returns the value of an internal property.
+   * @param name property name.
+   */
+  public String getProperty(String name) {
+    return properties.getProperty(name);
+  }
+  
+  /**
+   * Get the name of this policy.
+   */
+  public String getName() {
+    return this.policyName;
+  }
+
+  /**
+   * Get the destination path of this policy.
+   */
+  public String getDestinationPath() {
+    return this.destinationPath;
+  }
+
+  /**
+   * Get the srcPath
+   */
+  public Path getSrcPath() throws IOException {
+    return srcPath;
+  }
+
+  /**
+   * Get the expanded (unglobbed) forms of the srcPaths
+   */
+  public Path[] getSrcPathExpanded() throws IOException {
+    FileSystem fs = srcPath.getFileSystem(conf);
+
+    // globbing on srcPath
+    FileStatus[] gpaths = fs.globStatus(srcPath);
+    if (gpaths == null) {
+      return null;
+    }
+    Path[] values = new Path[gpaths.length];
+    for (int i = 0; i < gpaths.length; i++) {
+      Path p = gpaths[i].getPath();
+      values[i] = p.makeQualified(fs);
+    }
+    return values;
+  }
+
+  /**
+   * Convert this policy into a printable form
+   */
+  public String toString() {
+    StringBuffer buff = new StringBuffer();
+    buff.append("Policy Name:\t" + policyName + " --------------------\n");
+    buff.append("Source Path:\t" + srcPath + "\n");
+    buff.append("Dest Path:\t" + destinationPath + "\n");
+    for (Enumeration<?> e = properties.propertyNames(); e.hasMoreElements();) {
+      String name = (String) e.nextElement(); 
+      buff.append( name + ":\t" + properties.getProperty(name) + "\n");
+    }
+    if (description.length() > 0) {
+      int len = Math.min(description.length(), 80);
+      String sub = description.substring(0, len).trim();
+      sub = sub.replaceAll("\n", " ");
+      buff.append("Description:\t" + sub + "...\n");
+    }
+    return buff.toString();
+  }
+
+  //////////////////////////////////////////////////
+  // Writable
+  //////////////////////////////////////////////////
+  static {                                      // register a ctor
+    WritableFactories.setFactory
+      (PolicyInfo.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new PolicyInfo(); }
+       });
+  }
+
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, srcPath.toString());
+    Text.writeString(out, policyName);
+    Text.writeString(out, destinationPath);
+    Text.writeString(out, description);
+    out.writeInt(properties.size());
+    for (Enumeration<?> e = properties.propertyNames(); e.hasMoreElements();) {
+      String name = (String) e.nextElement(); 
+      Text.writeString(out, name);
+      Text.writeString(out, properties.getProperty(name));
+    }
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    this.srcPath = new Path(Text.readString(in));
+    this.policyName = Text.readString(in);
+    this.destinationPath = Text.readString(in);
+    this.description = Text.readString(in);
+    for (int n = in.readInt(); n>0; n--) {
+      String name = Text.readString(in);
+      String value = Text.readString(in);
+      properties.setProperty(name,value);
+    }
+  }
+}

Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java Tue Oct  6 07:31:11 2009
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid.protocol;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.Collection;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Comparator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Maintains informatiom about all policies that belong to a category.
+ * These policies have to be applied one-at-a-time and cannot be run
+ * simultaneously.
+ */
+public class PolicyList implements Writable {
+  public static final Log LOG = LogFactory.getLog(
+    "org.apache.hadoop.raid.protocol.PolicyList");
+
+  private List<PolicyInfo> category; // list of policies
+  private Path srcPath;
+  
+  /**
+   * Create a new category of policies.
+   */
+  public PolicyList() {
+    this.category = new LinkedList<PolicyInfo>();
+    this.srcPath = null;
+  }
+
+  /**
+   * Add a new policy to this category.
+   */
+  public void add(PolicyInfo info) {
+    category.add(info); 
+  }
+
+  public void setSrcPath(Configuration conf, String src) throws IOException {
+    srcPath = new Path(src);
+    srcPath = srcPath.makeQualified(srcPath.getFileSystem(conf));
+  }
+  
+  public Path getSrcPath() {
+    return srcPath;
+  }
+  
+  /**
+   * Returns the policies in this category
+   */
+  public Collection<PolicyInfo> getAll() {
+    return category;
+  }
+
+  /**
+   * Sort Categries based on their srcPath. reverse lexicographical order.
+   */
+  public static class CompareByPath implements Comparator<PolicyList> {
+    public CompareByPath() throws IOException {
+    }
+    public int compare(PolicyList l1, PolicyList l2) {
+      return 0 - l1.getSrcPath().compareTo(l2.getSrcPath());
+    }
+  }
+  
+  
+  //////////////////////////////////////////////////
+  // Writable
+  //////////////////////////////////////////////////
+  static {                                      // register a ctor
+    WritableFactories.setFactory
+      (PolicyList.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new PolicyList(); }
+       });
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(category.size());
+    for (PolicyInfo p : category) {
+      p.write(out);
+    }
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    int count = in.readInt();
+    for (int i = 0; i < count; i++) {
+      PolicyInfo p = new PolicyInfo();
+      p.readFields(in);
+      add(p);
+    }
+  }
+}

Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java Tue Oct  6 07:31:11 2009
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid.protocol;
+
+import java.util.Collection;
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.fs.Path;
+
+/**********************************************************************
+ * RaidProtocol is used by user code 
+ * {@link org.apache.hadoop.raid.RaidShell} class to communicate 
+ * with the RaidNode.  User code can manipulate the configured policies.
+ *
+ **********************************************************************/
+public interface RaidProtocol extends VersionedProtocol {
+
+  /**
+   * Compared to the previous version the following changes have been introduced:
+   * Only the latest change is reflected.
+   * 1: new protocol introduced
+   */
+  public static final long versionID = 1L;
+
+  /**
+   * A set of codes returned by RPC calls.
+   */
+  public enum ReturnStatus {
+    SUCCESS ((int)0x01),
+    FAILURE ((int)0x02),
+    RETRY   ((int)0x03);
+    private int code;
+
+    private ReturnStatus(int code) {
+      this.code = code;
+    }
+    int getReturnStatus() {return code;}
+  }
+
+  /**
+   * Get a listing of all configured policies
+   * @throws IOException
+   * return all categories of configured policies
+   */
+  public PolicyList[] getAllPolicies() throws IOException;
+
+  /**
+   * Unraid the specified input path. This is called when the specified file
+   * is corrupted. This call will move the specified file to file.old
+   * and then recover it from the RAID subsystem.
+   *
+   * @param inputPath The absolute pathname of the file to be recovered.
+   * @param corruptOffset The offset that has the corruption
+   */
+  public ReturnStatus recoverFile(String inputPath, long corruptOffset) throws IOException;
+
+}

Added: hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java Tue Oct  6 07:31:11 2009
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.zip.CRC32;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
+import org.apache.hadoop.raid.RaidNode;
+
+public class TestRaidDfs extends TestCase {
+  final static String TEST_DIR = new File(System.getProperty("test.build.data",
+      "build/contrib/raid/test/data")).getAbsolutePath();
+  final static String CONFIG_FILE = new File(TEST_DIR, 
+      "test-raid.xml").getAbsolutePath();
+  final static long RELOAD_INTERVAL = 1000;
+  final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidNode");
+  final Random rand = new Random();
+  final static int NUM_DATANODES = 3;
+
+  Configuration conf;
+  String namenode = null;
+  String hftp = null;
+  MiniDFSCluster dfs = null;
+  FileSystem fileSys = null;
+  RaidNode cnode = null;
+  String jobTrackerName = null;
+
+  private void mySetup() throws Exception {
+
+    new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+    conf = new Configuration();
+
+    conf.set("raid.config.file", CONFIG_FILE);
+    conf.setBoolean("raid.config.reload", true);
+    conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
+
+    // scan all policies once every 5 second
+    conf.setLong("raid.policy.rescan.interval", 5000);
+
+    // do not use map-reduce cluster for Raiding
+    conf.setBoolean("fs.raidnode.local", true);
+    conf.setInt("hdfs.raid.stripeLength", 3);
+    conf.set("hdfs.raid.locations", "/destraid");
+
+    dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
+    dfs.waitActive();
+    fileSys = dfs.getFileSystem();
+    namenode = fileSys.getUri().toString();
+    hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
+
+    FileSystem.setDefaultUri(conf, namenode);
+    
+    FileWriter fileWriter = new FileWriter(CONFIG_FILE);
+    fileWriter.write("<?xml version=\"1.0\"?>\n");
+    String str = "<configuration> " +
+                   "<srcPath prefix=\"/user/dhruba/raidtest\"> " +
+                     "<policy name = \"RaidTest1\"> " +
+                        "<destPath> /destraid</destPath> " +
+                        "<property> " +
+                          "<name>targetReplication</name> " +
+                          "<value>1</value> " + 
+                          "<description>after RAIDing, decrease the replication factor of a file to this value." +
+                          "</description> " + 
+                        "</property> " +
+                        "<property> " +
+                          "<name>metaReplication</name> " +
+                          "<value>1</value> " + 
+                          "<description> replication factor of parity file" +
+                          "</description> " + 
+                        "</property> " +
+                        "<property> " +
+                          "<name>modTimePeriod</name> " +
+                          "<value>2000</value> " + 
+                          "<description> time (milliseconds) after a file is modified to make it " +
+                                         "a candidate for RAIDing " +
+                          "</description> " + 
+                        "</property> " +
+                     "</policy>" +
+                   "</srcPath>" +
+                 "</configuration>";
+    fileWriter.write(str);
+    fileWriter.close();
+  }
+
+  private void myTearDown() throws Exception {
+    if (cnode != null) { cnode.stop(); cnode.join(); }
+    if (dfs != null) { dfs.shutdown(); }
+  }
+
+  /**
+   * Test DFS Raid
+   */
+  public void testRaidDfs() throws Exception {
+    LOG.info("Test testRaidDfs started.");
+    long blockSize = 8192L;
+    int stripeLength = 3;
+    mySetup();
+    Path file1 = new Path("/user/dhruba/raidtest/file1");
+    Path destPath = new Path("/destraid/user/dhruba/raidtest");
+    long crc1 = createOldFile(fileSys, file1, 1, 7, blockSize);
+    LOG.info("Test testPathFilter created test files");
+
+    // create an instance of the RaidNode
+    cnode = RaidNode.createRaidNode(null, conf);
+    
+    try {
+      FileStatus[] listPaths = null;
+
+      // wait till file is raided
+      while (listPaths == null || listPaths.length != 1) {
+        LOG.info("Test testPathFilter waiting for files to be raided.");
+        try {
+          listPaths = fileSys.listStatus(destPath);
+        } catch (FileNotFoundException e) {
+          //ignore
+        }
+        Thread.sleep(1000);                  // keep waiting
+      }
+      assertEquals(listPaths.length, 1); // all files raided
+      LOG.info("Files raided so far : " + listPaths[0].getPath());
+
+      // extract block locations from File system. Wait till file is closed.
+      LocatedBlocks locations = null;
+      DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
+      while (true) {
+        locations = dfs.getClient().getNamenode().getBlockLocations(file1.toString(),
+                                                               0, listPaths[0].getLen());
+        if (!locations.isUnderConstruction()) {
+          break;
+        }
+        Thread.sleep(1000);
+      }
+
+      // filter all filesystem calls from client
+      Configuration clientConf = new Configuration(conf);
+      clientConf.set("fs.hdfs.impl", "org.apache.hadoop.dfs.DistributedRaidFileSystem");
+      DistributedRaidFileSystem raidfs = new DistributedRaidFileSystem(dfs);
+      raidfs.initialize(dfs.getUri(), clientConf);
+
+      // corrupt first block of file
+      LOG.info("Corrupt first block of file");
+      corruptBlock(file1, locations.get(0).getBlock());
+      validateFile(raidfs, file1, file1, crc1);
+
+    } catch (Exception e) {
+      LOG.info("testPathFilter Exception " + e + StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      myTearDown();
+    }
+    LOG.info("Test testPathFilter completed.");
+  }
+  
+  //
+  // creates a file and populate it with random data. Returns its crc.
+  //
+  private long createOldFile(FileSystem fileSys, Path name, int repl, int numBlocks, long blocksize)
+    throws IOException {
+    CRC32 crc = new CRC32();
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            fileSys.getConf().getInt("io.file.buffer.size", 4096),
+                                            (short)repl, blocksize);
+    // fill random data into file
+    final byte[] b = new byte[(int)blocksize];
+    for (int i = 0; i < numBlocks; i++) {
+      rand.nextBytes(b);
+      stm.write(b);
+      crc.update(b);
+    }
+    stm.close();
+    return crc.getValue();
+  }
+
+  //
+  // validates that file matches the crc.
+  //
+  private void validateFile(FileSystem fileSys, Path name1, Path name2, long crc) 
+    throws IOException {
+
+    FileStatus stat1 = fileSys.getFileStatus(name1);
+    FileStatus stat2 = fileSys.getFileStatus(name2);
+    assertTrue(" Length of file " + name1 + " is " + stat1.getLen() + 
+               " is different from length of file " + name1 + " " + stat2.getLen(),
+               stat1.getLen() == stat2.getLen());
+
+    CRC32 newcrc = new CRC32();
+    FSDataInputStream stm = fileSys.open(name2);
+    final byte[] b = new byte[4192];
+    int num = 0;
+    while (num >= 0) {
+      num = stm.read(b);
+      if (num < 0) {
+        break;
+      }
+      newcrc.update(b, 0, num);
+    }
+    stm.close();
+    LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc);
+    if (newcrc.getValue() != crc) {
+      fail("CRC mismatch of files " + name1 + " with file " + name2);
+    }
+  }
+
+  /*
+   * The Data directories for a datanode
+   */
+  private File[] getDataNodeDirs(int i) throws IOException {
+    File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
+    File data_dir = new File(base_dir, "data");
+    File dir1 = new File(data_dir, "data"+(2*i+1));
+    File dir2 = new File(data_dir, "data"+(2*i+2));
+    if (dir1.isDirectory() && dir2.isDirectory()) {
+      File[] dir = new File[2];
+      dir[0] = new File(dir1, "current");
+      dir[1] = new File(dir2, "current"); 
+      return dir;
+    }
+    return new File[0];
+  }
+
+  //
+  // Corrupt specified block of file
+  //
+  void corruptBlock(Path file, Block blockNum) throws IOException {
+    long id = blockNum.getBlockId();
+
+    // Now deliberately remove/truncate data blocks from the block.
+    //
+    for (int i = 0; i < NUM_DATANODES; i++) {
+      File[] dirs = getDataNodeDirs(i);
+      
+      for (int j = 0; j < dirs.length; j++) {
+        File[] blocks = dirs[j].listFiles();
+        assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length >= 0));
+        for (int idx = 0; idx < blocks.length; idx++) {
+          if (blocks[idx].getName().startsWith("blk_" + id) &&
+              !blocks[idx].getName().endsWith(".meta")) {
+            blocks[idx].delete();
+            LOG.info("Deleted block " + blocks[idx]);
+          }
+        }
+      }
+    }
+  }
+
+}