You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dh...@apache.org on 2010/02/17 00:40:42 UTC

svn commit: r910774 - in /hadoop/mapreduce/trunk: CHANGES.txt src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java

Author: dhruba
Date: Tue Feb 16 23:40:42 2010
New Revision: 910774

URL: http://svn.apache.org/viewvc?rev=910774&view=rev
Log:
MAPREDUCE-1491. The parity files created by the RAID are combined
using Hadoop Archive Files (HAR).  (Rodrigo Schmidt via dhruba)


Added:
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=910774&r1=910773&r2=910774&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Feb 16 23:40:42 2010
@@ -345,8 +345,6 @@
     instantiation and initialization of the DistributedRaidFileSystem. 
     (Rodrigo Schmidt via dhruba)
 
-    cdouglas)
-
     MAPREDUCE-1476. Fix the M/R framework to not call commit for special
     tasks like job setup/cleanup and task cleanup.
     (Amareshwari Sriramadasu via yhemanth)
@@ -357,6 +355,9 @@
 
     MAPREDUCE-1444. Sqoop ConnManager instances can leak Statement objects.
     (Aaron Kimball via tomwhite)
+
+    MAPREDUCE-1491. The parity files created by the RAID are combined
+    using Hadoop Archive Files (HAR).  (Rodrigo Schmidt via dhruba)
  
 Release 0.21.0 - Unreleased
 

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=910774&r1=910773&r2=910774&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java Tue Feb 16 23:40:42 2010
@@ -30,6 +30,8 @@
 import java.util.HashSet;
 import java.lang.Thread;
 import java.net.InetSocketAddress;
+import java.net.URI;
+
 import org.xml.sax.SAXException;
 import javax.xml.parsers.ParserConfigurationException;
 
@@ -37,8 +39,10 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.HadoopArchives;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileStatus;
@@ -51,7 +55,6 @@
 import org.apache.hadoop.raid.protocol.PolicyInfo;
 import org.apache.hadoop.raid.protocol.PolicyList;
 import org.apache.hadoop.raid.protocol.RaidProtocol;
-import org.apache.hadoop.raid.protocol.RaidProtocol.ReturnStatus;
 
 /**
  * A {@link RaidNode} that implements 
@@ -70,7 +73,8 @@
   public static final int DEFAULT_PORT = 60000;
   public static final int DEFAULT_STRIPE_LENGTH = 5; // default value of stripe length
   public static final String DEFAULT_RAID_LOCATION = "/raid";
-
+  public static final String HAR_SUFFIX = "_raid.har";
+  
   /** RPC server */
   private Server server;
   /** RPC server address */
@@ -92,6 +96,9 @@
 
   /** Deamon thread to delete obsolete parity files */
   Daemon purgeThread = null;
+  
+  /** Deamon thread to har raid directories */
+  Daemon harThread = null;
 
   /** Do do distributed raiding */
   boolean isRaidLocal = false;
@@ -248,8 +255,13 @@
     // start the thread that deletes obsolete parity files
     this.purgeThread = new Daemon(new PurgeMonitor());
     this.purgeThread.start();
+
+    // start the thread that creates HAR files
+    this.harThread = new Daemon(new HarMonitor());
+    this.harThread.start();
   }
 
+  
   /**
    * Implement RaidProtocol methods
    */
@@ -453,6 +465,66 @@
     return null; // no matching policies
   }
 
+  /**
+   * Returns the Path to the parity file of a given file
+   * 
+   * @param destPathPrefix Destination prefix defined by some policy
+   * @param srcPath Path to the original source file
+   * @param create Boolean value telling whether a new parity file should be created
+   * @return Path object representing the parity file of the source
+   * @throws IOException
+   */
+  static private Path getParityFile(Path destPathPrefix, Path srcPath, boolean create, Configuration conf) throws IOException {
+    Path srcParent = srcPath.getParent();
+
+    FileSystem fsDest = destPathPrefix.getFileSystem(conf);
+
+    Path outDir = destPathPrefix;
+    if (srcParent != null) {
+      if (srcParent.getParent() == null) {
+        outDir = destPathPrefix;
+      } else {
+        outDir = new Path(destPathPrefix, makeRelative(srcParent));
+      }
+    }
+
+    String harDirName = srcParent.getName() + HAR_SUFFIX; 
+    Path HarPath = new Path(outDir,harDirName);
+    Path outPath =  new Path(destPathPrefix, makeRelative(srcPath));
+
+    if (create || !fsDest.exists(HarPath)) {  // case 1: no HAR file
+      return outPath;
+    }
+
+    URI HarPathUri = HarPath.toUri();
+    Path inHarPath = new Path("har://",HarPathUri.getPath()+"/"+outPath.toUri().getPath());
+    FileSystem fsHar = inHarPath.getFileSystem(conf);
+    
+    if (!fsHar.exists(inHarPath)) { // case 2: no file inside HAR
+      return outPath;
+    }
+
+    if (! fsDest.exists(outPath)) { // case 3: only inside HAR
+      return inHarPath;
+    }
+
+    // both inside and outside HAR. Should return most recent
+    FileStatus inHar = fsHar.getFileStatus(inHarPath);
+    FileStatus outHar = fsDest.getFileStatus(outPath);
+
+    if (inHar.getModificationTime() >= outHar.getModificationTime()) {
+      return inHarPath;
+    }
+
+    return outPath;
+  }
+  
+  private Path getParityFile(Path destPathPrefix, Path srcPath, boolean create) throws IOException {
+	  
+	  return getParityFile(destPathPrefix, srcPath, create, conf);
+	  
+  }
+  
  /**
   * Returns a list of pathnames that needs raiding.
   */
@@ -520,7 +592,7 @@
       // does not match the modTime of the source file, then recalculate RAID
       boolean add = false;
       try {
-        Path outpath =  new Path(destPathPrefix, makeRelative(path));
+        Path outpath =  getParityFile(destPathPrefix, path, false);
         FileSystem outFs = outpath.getFileSystem(conf);
         FileStatus ostat = outFs.getFileStatus(outpath);
         if (ostat.getModificationTime() != src.getModificationTime() &&
@@ -538,7 +610,9 @@
 
     } else if (files != null) {
       for (FileStatus one:files) {
-        recurse(srcFs, conf, destPathPrefix, one, accept, modTimePeriod, srcReplication, now);
+        if (!one.getPath().getName().endsWith(HAR_SUFFIX)){
+          recurse(srcFs, conf, destPathPrefix, one, accept, modTimePeriod, srcReplication, now);
+        }
       }
     }
   }
@@ -678,7 +752,7 @@
     long fileSize = stat.getLen();
 
     // create output tmp path
-    Path outpath =  new Path(destPathPrefix, makeRelative(inpath));
+    Path outpath =  getParityFile(destPathPrefix, inpath,true,conf);
     Path tmppath =  new Path(outpath + ".tmp");
     FileSystem outFs = outpath.getFileSystem(conf);
 
@@ -688,6 +762,7 @@
       if (stmp.getModificationTime() == stat.getModificationTime()) {
         LOG.info("Parity file for " + inpath + "(" + locations.length + ") is " + outpath +
                  " already upto-date. Nothing more to do.");
+        return;
       }
     } catch (IOException e) {
       // ignore errors because the raid file might not exist yet.
@@ -891,7 +966,7 @@
     LOG.info("Decompose a total of " + numLength + " blocks.");
 
     // open and seek to the appropriate offset in parity file.
-    Path parityFile =  new Path(destPathPrefix, makeRelative(srcPath));
+    Path parityFile =  getParityFile(destPathPrefix, srcPath, false, conf);
     FileSystem parityFs = parityFile.getFileSystem(conf);
     LOG.info("Parity file for " + srcPath + " is " + parityFile);
     ins[numLength-1] = parityFs.open(parityFile);
@@ -905,10 +980,11 @@
     // We need to generate a unique name for this tmp file later on.
     Path tmpFile = null;
     FSDataOutputStream fout = null;
+    FileSystem destFs = destPathPrefix.getFileSystem(conf);
     int retry = 5;
     try {
       tmpFile = new Path("/tmp/dhruba/" + rand.nextInt());
-      fout = parityFs.create(tmpFile, false);
+      fout = destFs.create(tmpFile, false);
     } catch (IOException e) {
       if (retry-- <= 0) {
         LOG.info("Unable to create temporary file " + tmpFile +
@@ -935,17 +1011,17 @@
 
     // Now, reopen the source file and the recovered block file
     // and copy all relevant data to new file
-    Path recoveredPath =  new Path(destPathPrefix, makeRelative(srcPath));
+    Path recoveredPath =  getParityFile(destPathPrefix, srcPath, true, conf);
     recoveredPath = new Path(recoveredPath + ".recovered");
     LOG.info("Creating recovered file " + recoveredPath);
 
     FSDataInputStream sin = srcFs.open(srcPath);
-    FSDataOutputStream out = parityFs.create(recoveredPath, false, 
+    FSDataOutputStream out = destFs.create(recoveredPath, false, 
                                              conf.getInt("io.file.buffer.size", 64 * 1024),
                                              srcStat.getReplication(), 
                                              srcStat.getBlockSize());
 
-    FSDataInputStream bin = parityFs.open(tmpFile);
+    FSDataInputStream bin = destFs.open(tmpFile);
     long recoveredSize = 0;
 
     // copy all the good blocks (upto the corruption)
@@ -992,12 +1068,12 @@
     bin.close();
 
     // delete the temporary block file that was created.
-    parityFs.delete(tmpFile, false);
+    destFs.delete(tmpFile, false);
     LOG.info("Deleted temporary file " + tmpFile);
 
     // copy the meta information from source path to the newly created
     // recovered path
-    copyMetaInformation(parityFs, srcStat, recoveredPath);
+    copyMetaInformation(destFs, srcStat, recoveredPath);
 
     return recoveredPath;
   }
@@ -1060,8 +1136,8 @@
               FileSystem destFs = FileSystem.get(destp.toUri(), conf);
               destp = destp.makeQualified(destFs);
               destinationPrefix = destp.toUri().getPath(); // strip host:port
-              destp = new Path(destp, makeRelative(info.getSrcPath()));
-
+              destp = getParityFile(destp, info.getSrcPath(), true);
+              
               // if this destination path has already been processed as part
               // of another policy, then nothing more to do
               if (processed.contains(destp)) {
@@ -1108,6 +1184,11 @@
       String destStr = destPath.toUri().getPath();
       LOG.debug("Checking " + destPath + " prefix " + destPrefix);
 
+      // Verify if it is a har file
+      if (destStr.endsWith(HAR_SUFFIX)) {
+        return;
+      }
+      
       // Verify the destPrefix is a prefix of the destPath
       if (!destStr.startsWith(destPrefix)) {
         LOG.error("Destination path " + destStr + " should have " + 
@@ -1116,10 +1197,11 @@
       }
       String src = destStr.replaceFirst(destPrefix, "");
       
-      // if the source path does not exist, then delete the 
-      // destination path as well
+      // if the source path does not exist or the parity file has been HARed, 
+      // then delete the parity file
       Path srcPath = new Path(src);
-      if (!srcFs.exists(srcPath)) {
+      Path dstPath = (new Path(destPrefix.trim())).makeQualified(destFs);
+      if (!srcFs.exists(srcPath) || !destPath.equals(getParityFile(dstPath,srcPath,false))) {
         boolean done = destFs.delete(destPath, false);
         if (done) {
           LOG.info("Purged path " + destPath );
@@ -1141,6 +1223,160 @@
     } 
   }
 
+  
+  private void doHar() throws IOException, InterruptedException {
+    
+    PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
+
+    long prevExec = 0;
+    while (running) {
+
+      LOG.info("Started HAR thread");
+      // The config may be reloaded by the TriggerMonitor. 
+      // This thread uses whatever config is currently active.
+      while(now() < prevExec + configMgr.getPeriodicity()){
+        Thread.sleep(SLEEP_TIME);
+      }
+
+      prevExec = now();
+      
+      // fetch all categories
+      Collection<PolicyList> all = configMgr.getAllPolicies();
+            
+      // sort all policies by reverse lexicographical order. This is 
+      // needed to make the nearest policy take precedence.
+      PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
+      Arrays.sort(sorted, lexi);
+
+      for (PolicyList category : sorted) {
+        for (PolicyInfo info: category.getAll()) {
+          String str = info.getProperty("time_before_har");
+          String tmpHarPath = info.getProperty("har_tmp_dir");
+          if (tmpHarPath == null) {
+            tmpHarPath = "/raid_har";
+          }
+          if (str != null) {
+            try {
+              long cutoff = now() - ( Long.parseLong(str) * 24L * 3600000L );
+              // expand destination prefix path
+              String destinationPrefix = getDestinationPath(conf, info);
+              Path destp = new Path(destinationPrefix.trim());
+              FileSystem destFs = FileSystem.get(destp.toUri(), conf);
+              destp = destp.makeQualified(destFs);
+              destinationPrefix = destp.toUri().getPath(); // strip host:port
+              destp = getParityFile(destp, info.getSrcPath(), true);
+
+              FileStatus stat = null;
+              try {
+                stat = destFs.getFileStatus(destp);
+              } catch (FileNotFoundException e) {
+                // do nothing, leave stat = null;
+              }
+              if (stat != null) {
+                LOG.info("Purging obsolete parity files for policy " + 
+                    info.getName() + " " + destp);
+
+                recurseHar(destFs, stat, cutoff, tmpHarPath);
+              }
+
+
+            } catch (Exception e) {
+              LOG.warn("Ignoring Exception while processing policy " + 
+                  info.getName() + " " + 
+                  StringUtils.stringifyException(e));
+            }
+          }
+        }
+      }
+    }
+    return;
+  }
+  
+  private void recurseHar(FileSystem destFs, FileStatus dest, long cutoff, String tmpHarPath)
+    throws IOException {
+
+    if (!dest.isDir()) {
+      return;
+    }
+    
+    Path destPath = dest.getPath(); // pathname, no host:port
+
+    // Verify if it already contains a HAR directory
+    if ( destFs.exists(new Path(destPath, destPath.getName()+HAR_SUFFIX)) ) {
+      return;
+    }
+
+    FileStatus[] files = null;
+    files = destFs.listStatus(destPath);
+    boolean shouldHar = false;
+    if (files != null) {
+      shouldHar = files.length > 0;
+      for (FileStatus one: files) {
+        if (one.isDir()){
+          recurseHar(destFs, one, cutoff, tmpHarPath);
+          shouldHar = false;
+        } else if (one.getModificationTime() > cutoff ) {
+          shouldHar = false;
+        }
+      }
+    }
+    if ( shouldHar ) {
+      singleHar(destFs, dest, tmpHarPath);
+    }
+  } 
+
+  
+  private void singleHar(FileSystem destFs, FileStatus dest, String tmpHarPath) throws IOException {
+    
+    Path root = new Path("/");
+    Path qualifiedPath = dest.getPath().makeQualified(destFs.getUri(),root);
+    String harFile = qualifiedPath.getName() + HAR_SUFFIX;
+    HadoopArchives har = new HadoopArchives(conf);
+    String[] args = new String[6];
+    args[0] = "-archiveName";
+    args[1] = harFile;
+    args[2] = "-p"; 
+    args[3] = root.makeQualified(destFs.getUri(),root).toString();
+    args[4] = qualifiedPath.toUri().getPath().substring(1);
+    args[5] = tmpHarPath.toString();
+    int ret = 0;
+    try {
+      ret = ToolRunner.run(har, args);
+      if (ret == 0 && !destFs.rename(new Path(tmpHarPath+"/"+harFile), 
+                                     new Path(qualifiedPath, harFile))){
+        LOG.info("HAR rename didn't succeed");
+        ret = -2;
+      }
+    } catch (Exception exc) {
+      throw new IOException("Error while creating archive " + ret, exc);
+    }
+    
+    if (ret != 0){
+      throw new IOException("Error while creating archive " + ret);
+    }
+    return;
+  }
+  
+  /**
+   * Periodically generates HAR files
+   */
+  class HarMonitor implements Runnable {
+
+    public void run() {
+      while (running) {
+        try {
+          doHar();
+        } catch (Exception e) {
+          LOG.error(StringUtils.stringifyException(e));
+        } finally {
+          LOG.info("Har parity files thread continuing to run...");
+        }
+      }
+    }
+    
+
+  }  
+  
   /**
    * If the config file has an entry for hdfs.raid.locations, then that overrides
    * destination path specified in the raid policy file

Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java?rev=910774&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java Tue Feb 16 23:40:42 2010
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FileNotFoundException;
+import java.util.Random;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.MiniMRCluster;
+
+/**
+ * If a file gets deleted, then verify that the parity file gets deleted too.
+ */
+public class TestRaidHar extends TestCase {
+  final static String TEST_DIR = new File(System.getProperty("test.build.data",
+      "build/contrib/raid/test/data")).getAbsolutePath();
+  final static String CONFIG_FILE = new File(TEST_DIR, 
+      "test-raid.xml").getAbsolutePath();
+  final static long RELOAD_INTERVAL = 1000;
+  final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidNode");
+  final Random rand = new Random();
+
+  {
+    ((Log4JLogger)RaidNode.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+
+  Configuration conf;
+  String namenode = null;
+  String hftp = null;
+  MiniDFSCluster dfs = null;
+  MiniMRCluster mr = null;
+  FileSystem fileSys = null;
+  String jobTrackerName = null;
+
+  /**
+   * create mapreduce and dfs clusters
+   */
+  private void createClusters(boolean local) throws Exception {
+
+    new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+    conf = new Configuration();
+    conf.set("raid.config.file", CONFIG_FILE);
+    conf.setBoolean("raid.config.reload", true);
+    conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
+
+    // scan all policies once every 5 second
+    conf.setLong("raid.policy.rescan.interval", 5000);
+
+    // make all deletions not go through Trash
+    conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
+
+    // the RaidNode does the raiding inline (instead of submitting to map/reduce)
+    conf.setBoolean("fs.raidnode.local", local);
+
+    // create a dfs and map-reduce cluster
+    final int taskTrackers = 4;
+
+    dfs = new MiniDFSCluster(conf, 3, true, null);
+    dfs.waitActive();
+    fileSys = dfs.getFileSystem();
+    namenode = fileSys.getUri().toString();
+    mr = new MiniMRCluster(taskTrackers, namenode, 3);
+    jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+    hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
+
+    FileSystem.setDefaultUri(conf, namenode);
+    conf.set("mapred.job.tracker", jobTrackerName);
+  }
+    
+  /**
+   * create raid.xml file for RaidNode
+   */
+  private void mySetup(long targetReplication,
+                long metaReplication, long stripeLength) throws Exception {
+    FileWriter fileWriter = new FileWriter(CONFIG_FILE);
+    fileWriter.write("<?xml version=\"1.0\"?>\n");
+    String str = "<configuration> " +
+                   "<srcPath prefix=\"/user/test/raidtest\"> " +
+                     "<policy name = \"RaidTest1\"> " +
+                        "<destPath> /destraid</destPath> " +
+                        "<property> " +
+                          "<name>targetReplication</name> " +
+                          "<value>" + targetReplication + "</value> " +
+                          "<description>after RAIDing, decrease the replication factor of a file to this value." +
+                          "</description> " + 
+                        "</property> " +
+                        "<property> " +
+                          "<name>metaReplication</name> " +
+                          "<value>" + metaReplication + "</value> " +
+                          "<description> replication factor of parity file" +
+                          "</description> " + 
+                        "</property> " +
+                        "<property> " +
+                          "<name>stripeLength</name> " +
+                          "<value>" + stripeLength + "</value> " +
+                          "<description> the max number of blocks in a file to RAID together " +
+                          "</description> " + 
+                        "</property> " +
+                        "<property> " +
+                          "<name>time_before_har</name> " +
+                          "<value>0</value> " +
+                          "<description> amount of time waited before har'ing parity files" +
+                          "</description> " + 
+                        "</property> " +
+                        "<property> " +
+                          "<name>modTimePeriod</name> " +
+                          "<value>2000</value> " + 
+                          "<description> time (milliseconds) after a file is modified to make it " +
+                                         "a candidate for RAIDing " +
+                          "</description> " + 
+                        "</property> " +
+                     "</policy>" +
+                   "</srcPath>" +
+                 "</configuration>";
+    fileWriter.write(str);
+    fileWriter.close();
+  }
+
+  /**
+   * stop clusters created earlier
+   */
+  private void stopClusters() throws Exception {
+    if (mr != null) { mr.shutdown(); }
+    if (dfs != null) { dfs.shutdown(); }
+  }
+
+  /**
+   * Test that parity files that do not have an associated master file
+   * get deleted.
+   */
+  public void testRaidHar() throws Exception {
+    LOG.info("Test testRaidHar  started.");
+
+    long blockSizes    []  = {1024L};
+    long stripeLengths []  = {5};
+    long targetReplication = 1;
+    long metaReplication   = 1;
+    int  numBlock          = 9;
+    int  iter = 0;
+
+    createClusters(true);
+    try {
+      for (long blockSize : blockSizes) {
+        for (long stripeLength : stripeLengths) {
+           doTestHar(iter, targetReplication, metaReplication,
+                       stripeLength, blockSize, numBlock);
+           iter++;
+        }
+      }
+    } finally {
+      stopClusters();
+    }
+    LOG.info("Test testRaidHar completed.");
+  }
+
+  /**
+   * Create parity file, delete original file and then validate that
+   * parity file is automatically deleted.
+   */
+  private void doTestHar(int iter, long targetReplication,
+                          long metaReplication, long stripeLength,
+                          long blockSize, int numBlock) throws Exception {
+    LOG.info("doTestHar started---------------------------:" +  " iter " + iter +
+             " blockSize=" + blockSize + " stripeLength=" + stripeLength);
+    mySetup(targetReplication, metaReplication, stripeLength);
+    RaidShell shell = null;
+    Path dir = new Path("/user/test/raidtest/");
+    Path file1 = new Path(dir + "/file" + iter);
+    RaidNode cnode = null;
+    try {
+      Path destPath = new Path("/destraid/user/test/raidtest");
+      fileSys.delete(dir, true);
+      fileSys.delete(destPath, true);
+      TestRaidNode.createOldFile(fileSys, file1, 1, numBlock, blockSize);
+      LOG.info("doTestHar created test files for iteration " + iter);
+
+      // create an instance of the RaidNode
+      cnode = RaidNode.createRaidNode(null, conf);
+      int times = 10;
+
+      while (times-- > 0) {
+        try {
+          shell = new RaidShell(conf);
+        } catch (Exception e) {
+          LOG.info("doTestHar unable to connect to " + RaidNode.getAddress(conf) +
+                   " retrying....");
+          Thread.sleep(1000);
+          continue;
+        }
+        break;
+      }
+      LOG.info("doTestHar created RaidShell.");
+      FileStatus[] listPaths = null;
+
+      // wait till file is raided
+      while (true) {
+        try {
+          listPaths = fileSys.listStatus(destPath);
+          int count = 0;
+          if (listPaths != null) {
+            for (FileStatus s : listPaths) {
+              LOG.info("doTestHar found path " + s.getPath());
+              if (s.getPath().toString().endsWith(".har")) {
+                count++;
+              }
+            }
+          }
+          if (count == 1  && listPaths.length == 1) {
+            break;
+          }
+        } catch (FileNotFoundException e) {
+          //ignore
+        }
+        LOG.info("doTestHar waiting for files to be raided and parity files to be har'ed and deleted. Found " + 
+                 (listPaths == null ? "none" : listPaths.length));
+        Thread.sleep(1000);                  // keep waiting
+      }
+      
+      
+    } catch (Exception e) {
+      LOG.info("doTestHar Exception " + e +
+                                          StringUtils.stringifyException(e));
+      throw e;
+    } finally {
+      shell.close();
+      if (cnode != null) { cnode.stop(); cnode.join(); }
+      LOG.info("doTestHar delete file " + file1);
+      fileSys.delete(file1, true);
+    }
+    LOG.info("doTestHar completed:" + " blockSize=" + blockSize +
+             " stripeLength=" + stripeLength);
+  }
+}