You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sc...@apache.org on 2010/11/23 00:44:34 UTC

svn commit: r1037944 - in /hadoop/mapreduce/trunk: CHANGES.txt src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java

Author: schen
Date: Mon Nov 22 23:44:34 2010
New Revision: 1037944

URL: http://svn.apache.org/viewvc?rev=1037944&view=rev
Log:
MAPREDUCE-2184. Port DistRaid.java to new mapreduce API. (Ramkumar Vadali via
schen)

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
    hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1037944&r1=1037943&r2=1037944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Nov 22 23:44:34 2010
@@ -190,6 +190,9 @@ Release 0.22.0 - Unreleased
 
     MAPREDUCE-1931. Gridmix forrest documentation . (Ranjit Mathew via vinodkv).
 
+    MAPREDUCE-2184. Port DistRaid.java to new mapreduce API. (Ramkumar Vadali
+    via schen)
+
   OPTIMIZATIONS
 
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java?rev=1037944&r1=1037943&r2=1037944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java Mon Nov 22 23:44:34 2010
@@ -28,68 +28,48 @@ import java.text.SimpleDateFormat;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.SequenceFileRecordReader;
-import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.SequenceFile.Reader;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.raid.RaidNode.Statistics;
 import org.apache.hadoop.raid.protocol.PolicyInfo;
 import org.apache.hadoop.util.StringUtils;
 
-public class DistRaid {
+public class DistRaid extends Configured {
 
   protected static final Log LOG = LogFactory.getLog(DistRaid.class);
 
   static final String NAME = "distRaid";
   static final String JOB_DIR_LABEL = NAME + ".job.dir";
-  static final String OP_LIST_LABEL = NAME + ".op.list";
-  static final String OP_COUNT_LABEL = NAME + ".op.count";
   static final int   OP_LIST_BLOCK_SIZE = 32 * 1024 * 1024; // block size of control file
   static final short OP_LIST_REPLICATION = 10; // replication factor of control file
 
-  private static final long OP_PER_MAP = 100;
-  private static final int MAX_MAPS_PER_NODE = 20;
+  public static final String OPS_PER_TASK = "raid.distraid.opspertask";
+  private static final int DEFAULT_OPS_PER_TASK = 100;
   private static final int SYNC_FILE_MAX = 10;
   private static final SimpleDateFormat dateForm = new SimpleDateFormat("yyyy-MM-dd HH:mm");
-  private static String jobName = NAME;
 
   static enum Counter {
     FILES_SUCCEEDED, FILES_FAILED, PROCESSED_BLOCKS, PROCESSED_SIZE, META_BLOCKS, META_SIZE
   }
 
-  protected JobConf jobconf;
-
-  /** {@inheritDoc} */
-  public void setConf(Configuration conf) {
-    if (jobconf != conf) {
-      jobconf = conf instanceof JobConf ? (JobConf) conf : new JobConf(conf);
-    }
-  }
-
-  /** {@inheritDoc} */
-  public JobConf getConf() {
-    return jobconf;
-  }
-
   public DistRaid(Configuration conf) {
-    setConf(createJobConf(conf));
+    super(conf);
   }
 
   private static final Random RANDOM = new Random();
@@ -99,9 +79,9 @@ public class DistRaid {
   }
 
   /**
-   * 
+   *
    * helper class which holds the policy and paths
-   * 
+   *
    */
   public static class RaidPolicyPathPair {
     public PolicyInfo policy;
@@ -115,127 +95,102 @@ public class DistRaid {
 
   List<RaidPolicyPathPair> raidPolicyPathPairList = new ArrayList<RaidPolicyPathPair>();
 
-  private JobClient jobClient;
-  private RunningJob runningJob;
-  private int jobEventCounter = 0;
+  private Job runningJob;
   private String lastReport = null;
 
   /** Responsible for generating splits of the src file list. */
-  static class DistRaidInputFormat implements InputFormat<Text, PolicyInfo> {
-    /** Do nothing. */
-    public void validateInput(JobConf job) {
-    }
-
+  static class DistRaidInputFormat extends 
+		  SequenceFileInputFormat<Text, PolicyInfo> {
     /**
      * Produce splits such that each is no greater than the quotient of the
      * total size and the number of splits requested.
-     * 
+     *
      * @param job
-     *          The handle to the JobConf object
+     *          The handle to the Configuration object
      * @param numSplits
      *          Number of splits requested
      */
-    public InputSplit[] getSplits(JobConf job, int numSplits)
-        throws IOException {
-      final int srcCount = job.getInt(OP_COUNT_LABEL, -1);
-      final int targetcount = srcCount / numSplits;
-      String srclist = job.get(OP_LIST_LABEL, "");
-      if (srcCount < 0 || "".equals(srclist)) {
-        throw new RuntimeException("Invalid metadata: #files(" + srcCount
-            + ") listuri(" + srclist + ")");
-      }
-      Path srcs = new Path(srclist);
-      FileSystem fs = srcs.getFileSystem(job);
-
-      List<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+    public List<InputSplit> getSplits(JobContext job) throws IOException {
+      Configuration conf = job.getConfiguration();
 
-      Text key = new Text();
-      PolicyInfo value = new PolicyInfo();
-      SequenceFile.Reader in = null;
+      // We create only one input file. So just get the first file in the first
+      // input directory.
+      Path inDir = getInputPaths(job)[0];
+      FileSystem fs = inDir.getFileSystem(conf);
+      FileStatus[] inputFiles = fs.listStatus(inDir);
+      Path inputFile = inputFiles[0].getPath();
+
+      List<InputSplit> splits = new ArrayList<InputSplit>();
+      SequenceFile.Reader in =
+        new SequenceFile.Reader(conf, Reader.file(inputFile));
       long prev = 0L;
-      int count = 0; // count src
+      final int opsPerTask = conf.getInt(OPS_PER_TASK, DEFAULT_OPS_PER_TASK);
       try {
-        for (in = new SequenceFile.Reader(fs, srcs, job); in.next(key, value);) {
+        Text key = new Text();
+        PolicyInfo value = new PolicyInfo();
+        int count = 0; // count src
+        while (in.next(key, value)) {
           long curr = in.getPosition();
           long delta = curr - prev;
-          if (++count > targetcount) {
+          if (++count > opsPerTask) {
             count = 0;
-            splits.add(new FileSplit(srcs, prev, delta, (String[]) null));
+            splits.add(new FileSplit(inputFile, prev, delta, (String[]) null));
             prev = curr;
           }
         }
       } finally {
         in.close();
       }
-      long remaining = fs.getFileStatus(srcs).getLen() - prev;
+      long remaining = fs.getFileStatus(inputFile).getLen() - prev;
       if (remaining != 0) {
-        splits.add(new FileSplit(srcs, prev, remaining, (String[]) null));
+        splits.add(new FileSplit(inputFile, prev, remaining, (String[]) null));
       }
-      LOG.info("jobname= " + jobName + " numSplits=" + numSplits + 
-               ", splits.size()=" + splits.size());
-      return splits.toArray(new FileSplit[splits.size()]);
-    }
-
-    /** {@inheritDoc} */
-    public RecordReader<Text, PolicyInfo> getRecordReader(InputSplit split,
-        JobConf job, Reporter reporter) throws IOException {
-      return new SequenceFileRecordReader<Text, PolicyInfo>(job,
-          (FileSplit) split);
+      return splits;
     }
   }
 
   /** The mapper for raiding files. */
-  static class DistRaidMapper implements
-      Mapper<Text, PolicyInfo, WritableComparable, Text> {
-    private JobConf jobconf;
-    private boolean ignoreFailures;
+  static class DistRaidMapper extends Mapper<Text, PolicyInfo, Text, Text> {
+    private boolean ignoreFailures = false;
 
     private int failcount = 0;
     private int succeedcount = 0;
-    private Statistics st = null;
-    private Reporter reporter = null;
+    private Statistics st = new Statistics();
 
     private String getCountString() {
       return "Succeeded: " + succeedcount + " Failed: " + failcount;
     }
 
-    /** {@inheritDoc} */
-    public void configure(JobConf job) {
-      this.jobconf = job;
-      ignoreFailures = false;
-      st = new Statistics();
-    }
-
-    /** Run a FileOperation */
-    public void map(Text key, PolicyInfo policy,
-        OutputCollector<WritableComparable, Text> out, Reporter reporter)
-        throws IOException {
-      this.reporter = reporter;
+    /** Run a FileOperation
+     * @throws IOException
+     * @throws InterruptedException */
+    public void map(Text key, PolicyInfo policy, Context context)
+        throws IOException, InterruptedException {
       try {
+        Configuration jobConf = context.getConfiguration();
         LOG.info("Raiding file=" + key.toString() + " policy=" + policy);
         Path p = new Path(key.toString());
-        FileStatus fs = p.getFileSystem(jobconf).getFileStatus(p);
+        FileStatus fs = p.getFileSystem(jobConf).getFileStatus(p);
         st.clear();
-        RaidNode.doRaid(jobconf, policy, fs, st, reporter);
+        RaidNode.doRaid(jobConf, policy, fs, st, context);
 
         ++succeedcount;
 
-        reporter.incrCounter(Counter.PROCESSED_BLOCKS, st.numProcessedBlocks);
-        reporter.incrCounter(Counter.PROCESSED_SIZE, st.processedSize);
-        reporter.incrCounter(Counter.META_BLOCKS, st.numMetaBlocks);
-        reporter.incrCounter(Counter.META_SIZE, st.metaSize);
-
-        reporter.incrCounter(Counter.FILES_SUCCEEDED, 1);
+        context.getCounter(Counter.PROCESSED_BLOCKS).increment(st.numProcessedBlocks);
+        context.getCounter(Counter.PROCESSED_SIZE).increment(st.processedSize);
+        context.getCounter(Counter.META_BLOCKS).increment(st.numMetaBlocks);
+        context.getCounter(Counter.META_SIZE).increment(st.metaSize);
+        context.getCounter(Counter.FILES_SUCCEEDED).increment(1);
       } catch (IOException e) {
         ++failcount;
-        reporter.incrCounter(Counter.FILES_FAILED, 1);
+        context.getCounter(Counter.FILES_FAILED).increment(1);
 
         String s = "FAIL: " + policy + ", " + key + " "
             + StringUtils.stringifyException(e);
-        out.collect(null, new Text(s));
+        context.write(new Text(key), new Text(s));
         LOG.info(s);
       } finally {
-        reporter.setStatus(getCountString());
+        context.setStatus(getCountString());
       }
     }
 
@@ -267,26 +222,27 @@ public class DistRaid {
   }
 
   /**
-   * create new job conf based on configuration passed.
-   * 
+   * Creates a new Job object.
    * @param conf
-   * @return
+   * @return a Job object
+   * @throws IOException
    */
-  static JobConf createJobConf(Configuration conf) {
-    JobConf jobconf = new JobConf(conf, DistRaid.class);
-    jobName = NAME + " " + dateForm.format(new Date(RaidNode.now()));
-    jobconf.setJobName(jobName);
-    jobconf.setMapSpeculativeExecution(false);
-    setSchedulerOption(jobconf);
-
-    jobconf.setJarByClass(DistRaid.class);
-    jobconf.setInputFormat(DistRaidInputFormat.class);
-    jobconf.setOutputKeyClass(Text.class);
-    jobconf.setOutputValueClass(Text.class);
-
-    jobconf.setMapperClass(DistRaidMapper.class);
-    jobconf.setNumReduceTasks(0);
-    return jobconf;
+  static Job createJob(Configuration jobConf) throws IOException {
+    String jobName = NAME + " " + dateForm.format(new Date(RaidNode.now()));
+
+    setSchedulerOption(jobConf);
+
+    Job job = Job.getInstance(jobConf, jobName);
+    job.setSpeculativeExecution(false);
+    job.setJarByClass(DistRaid.class);
+    job.setInputFormatClass(DistRaidInputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+
+    job.setMapperClass(DistRaidMapper.class);
+    job.setNumReduceTasks(0);
+
+    return job;
   }
 
   /** Add paths to be raided */
@@ -294,25 +250,24 @@ public class DistRaid {
     raidPolicyPathPairList.add(new RaidPolicyPathPair(info, paths));
   }
 
-  /** Calculate how many maps to run. */
-  private static int getMapCount(int srcCount, int numNodes) {
-    int numMaps = (int) (srcCount / OP_PER_MAP);
-    numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
-    return Math.max(numMaps, MAX_MAPS_PER_NODE);
-  }
-
   /** Invokes a map-reduce job do parallel raiding.
    *  @return true if the job was started, false otherwise
+   * @throws InterruptedException
    */
   public boolean startDistRaid() throws IOException {
     assert(raidPolicyPathPairList.size() > 0);
-    if (setup()) {
-      this.jobClient = new JobClient(jobconf);
-      this.runningJob = this.jobClient.submitJob(jobconf);
-      LOG.info("Job Started: " + runningJob.getID());
+    Job job = createJob(getConf());
+    createInputFile(job);
+    try {
+      job.submit();
+      this.runningJob = job;
+      LOG.info("Job Started: " + runningJob.getJobID());
       return true;
+    } catch (ClassNotFoundException e) {
+      throw new IOException(e);
+    } catch (InterruptedException e) {
+      return false;
     }
-    return false;
   }
 
    /** Checks if the map-reduce job has completed.
@@ -321,76 +276,74 @@ public class DistRaid {
     * @throws IOException
     */
    public boolean checkComplete() throws IOException {
-     JobID jobID = runningJob.getID();
-     if (runningJob.isComplete()) {
-       // delete job directory
-       final String jobdir = jobconf.get(JOB_DIR_LABEL);
-       if (jobdir != null) {
-         final Path jobpath = new Path(jobdir);
-         jobpath.getFileSystem(jobconf).delete(jobpath, true);
-       }
-       if (runningJob.isSuccessful()) {
-         LOG.info("Job Complete(Succeeded): " + jobID);
+     JobID jobID = runningJob.getJobID();
+     try {
+      if (runningJob.isComplete()) {
+         // delete job directory
+         Configuration jobConf = runningJob.getConfiguration();
+         final String jobdir = jobConf.get(JOB_DIR_LABEL);
+         if (jobdir != null) {
+           final Path jobpath = new Path(jobdir);
+           jobpath.getFileSystem(jobConf).delete(jobpath, true);
+         }
+         if (runningJob.isSuccessful()) {
+           LOG.info("Job Complete(Succeeded): " + jobID);
+         } else {
+           LOG.info("Job Complete(Failed): " + jobID);
+         }
+         raidPolicyPathPairList.clear();
+         return true;
        } else {
-         LOG.info("Job Complete(Failed): " + jobID);
-       }
-       raidPolicyPathPairList.clear();
-       return true;
-     } else {
-       String report =  (" job " + jobID +
-         " map " + StringUtils.formatPercent(runningJob.mapProgress(), 0)+
-         " reduce " + StringUtils.formatPercent(runningJob.reduceProgress(), 0));
-       if (!report.equals(lastReport)) {
-         LOG.info(report);
-         lastReport = report;
-       }
-       TaskCompletionEvent[] events =
-         runningJob.getTaskCompletionEvents(jobEventCounter);
-       jobEventCounter += events.length;
-       for(TaskCompletionEvent event : events) {
-         if (event.getTaskStatus() ==  TaskCompletionEvent.Status.FAILED) {
-           LOG.info(" Job " + jobID + " " + event.toString());
+         String report =  (" job " + jobID +
+           " map " + StringUtils.formatPercent(runningJob.mapProgress(), 0)+
+           " reduce " + StringUtils.formatPercent(runningJob.reduceProgress(), 0));
+         if (!report.equals(lastReport)) {
+           LOG.info(report);
+           lastReport = report;
          }
+         return false;
        }
-       return false;
-     }
+    } catch (InterruptedException e) {
+      return false;
+    }
    }
 
    public boolean successful() throws IOException {
-     return runningJob.isSuccessful();
+     try {
+      return runningJob.isSuccessful();
+    } catch (InterruptedException e) {
+      return false;
+    }
    }
 
   /**
    * set up input file which has the list of input files.
-   * 
+   *
    * @return boolean
    * @throws IOException
    */
-  private boolean setup() throws IOException {
-    final String randomId = getRandomId();
-    JobClient jClient = new JobClient(jobconf);
-    Path jobdir = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
-
-    LOG.info(JOB_DIR_LABEL + "=" + jobdir);
-    jobconf.set(JOB_DIR_LABEL, jobdir.toString());
-    Path log = new Path(jobdir, "_logs");
+  private void createInputFile(Job job) throws IOException {
+    Configuration jobConf = job.getConfiguration();
+    Path jobDir = new Path(JOB_DIR_LABEL + getRandomId());
+    Path inDir = new Path(jobDir, "in");
+    Path outDir = new Path(jobDir, "out");
+    FileInputFormat.setInputPaths(job, inDir);
+    FileOutputFormat.setOutputPath(job, outDir);
+    Path opList = new Path(inDir, NAME);
 
+    Configuration tmp = new Configuration(jobConf);
     // The control file should have small size blocks. This helps
     // in spreading out the load from mappers that will be spawned.
-    jobconf.setInt("dfs.blocks.size",  OP_LIST_BLOCK_SIZE);
+    tmp.setInt("dfs.blocks.size",  OP_LIST_BLOCK_SIZE);
+    FileSystem fs = opList.getFileSystem(tmp);
 
-    FileOutputFormat.setOutputPath(jobconf, log);
-    LOG.info("log=" + log);
-
-    // create operation list
-    FileSystem fs = jobdir.getFileSystem(jobconf);
-    Path opList = new Path(jobdir, "_" + OP_LIST_LABEL);
-    jobconf.set(OP_LIST_LABEL, opList.toString());
     int opCount = 0, synCount = 0;
     SequenceFile.Writer opWriter = null;
     try {
-      opWriter = SequenceFile.createWriter(fs, jobconf, opList, Text.class,
-          PolicyInfo.class, SequenceFile.CompressionType.NONE);
+      opWriter = SequenceFile.createWriter(
+          jobConf, Writer.file(opList), Writer.keyClass(Text.class),
+          Writer.valueClass(PolicyInfo.class),
+          Writer.compression(SequenceFile.CompressionType.NONE));
       for (RaidPolicyPathPair p : raidPolicyPathPairList) {
         // If a large set of files are Raided for the first time, files
         // in the same directory that tend to have the same size will end up
@@ -411,16 +364,10 @@ public class DistRaid {
       if (opWriter != null) {
         opWriter.close();
       }
-      fs.setReplication(opList, OP_LIST_REPLICATION); // increase replication for control file
+      // increase replication for control file
+      fs.setReplication(opList, OP_LIST_REPLICATION);
     }
     raidPolicyPathPairList.clear();
-    
-    jobconf.setInt(OP_COUNT_LABEL, opCount);
     LOG.info("Number of files=" + opCount);
-    jobconf.setNumMapTasks(getMapCount(opCount, new JobClient(jobconf)
-        .getClusterStatus().getTaskTrackers()));
-    LOG.info("jobName= " + jobName + " numMapTasks=" + jobconf.getNumMapTasks());
-    return opCount != 0;
-
   }
 }

Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java?rev=1037944&r1=1037943&r2=1037944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java Mon Nov 22 23:44:34 2010
@@ -21,14 +21,7 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.GregorianCalendar;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Properties;
 import java.util.Random;
 import java.util.zip.CRC32;
 
@@ -45,7 +38,6 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.raid.protocol.PolicyInfo;
 import org.apache.hadoop.raid.protocol.PolicyList;
 import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
@@ -101,13 +93,15 @@ public class TestRaidNode extends TestCa
     conf.set("raid.server.address", "localhost:0");
 
     // create a dfs and map-reduce cluster
-    final int taskTrackers = 4;
-    final int jobTrackerPort = 60050;
-
-    dfs = new MiniDFSCluster(conf, 6, true, null);
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+    builder.numDataNodes(6);
+    builder.format(true);
+    dfs = builder.build();
     dfs.waitActive();
     fileSys = dfs.getFileSystem();
+    
     namenode = fileSys.getUri().toString();
+    final int taskTrackers = 4;
     mr = new MiniMRCluster(taskTrackers, namenode, 3);
     jobTrackerName = "localhost:" + mr.getJobTrackerPort();
     hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
@@ -291,7 +285,6 @@ public class TestRaidNode extends TestCa
 
       // create an instance of the RaidNode
       cnode = RaidNode.createRaidNode(null, conf);
-
       FileStatus[] listPaths = null;
 
       // wait till file is raided
@@ -368,14 +361,15 @@ public class TestRaidNode extends TestCa
   // greater than or equal to the specified value
   private void doCheckPolicy() throws Exception {
     LOG.info("doCheckPolicy started---------------------------:"); 
-    short srcReplication = 3;
+    short srcReplication = 1;
     long targetReplication = 2;
     long metaReplication = 1;
     long stripeLength = 2;
     long blockSize = 1024;
     int numBlock = 3;
     ConfigBuilder cb = new ConfigBuilder();
-    cb.addPolicy("policy1", "/user/dhruba/policytest", (short)1, targetReplication, metaReplication, stripeLength);
+    cb.addPolicy("policy1", "/user/dhruba/policytest", srcReplication,
+        targetReplication, metaReplication, stripeLength);
     cb.persist();
     Path dir = new Path("/user/dhruba/policytest/");
     Path file1 = new Path(dir + "/file1");
@@ -392,7 +386,7 @@ public class TestRaidNode extends TestCa
       cnode = RaidNode.createRaidNode(null, localConf);
 
       // this file should be picked up RaidNode
-      long crc2 = createOldFile(fileSys, file2, 2, numBlock, blockSize);
+      createOldFile(fileSys, file2, 2, numBlock, blockSize);
       FileStatus[] listPaths = null;
 
       long firstmodtime = 0;
@@ -429,7 +423,7 @@ public class TestRaidNode extends TestCa
       LOG.info("doCheckPolicy all files found in Raid the first time.");
 
       LOG.info("doCheckPolicy: recreating source file");
-      crc2 = createOldFile(fileSys, file2, 2, numBlock, blockSize);
+      createOldFile(fileSys, file2, 2, numBlock, blockSize);
 
       FileStatus st = fileSys.getFileStatus(file2);
       assertTrue(st.getModificationTime() > firstmodtime);
@@ -497,8 +491,10 @@ public class TestRaidNode extends TestCa
 
     createClusters(false);
     ConfigBuilder cb = new ConfigBuilder();
-    cb.addPolicy("policy1", "/user/dhruba/raidtest", (short)1, targetReplication, metaReplication, stripeLength);
-    cb.addPolicy("policy2", "/user/dhruba/raidtest2", (short)1, targetReplication, metaReplication, stripeLength);
+    cb.addPolicy("policy1", "/user/dhruba/raidtest",
+        srcReplication, targetReplication, metaReplication, stripeLength);
+    cb.addPolicy("policy2", "/user/dhruba/raidtest2",
+        srcReplication, targetReplication, metaReplication, stripeLength);
     cb.persist();
 
     RaidNode cnode = null;
@@ -515,13 +511,15 @@ public class TestRaidNode extends TestCa
         for (PolicyInfo p : policyList.getAll()) {
           if (p.getName().equals("policy1")) {
             Path srcPath = new Path("/user/dhruba/raidtest");
+            FileSystem fs = srcPath.getFileSystem(conf);
             assertTrue(p.getSrcPath().equals(
-                srcPath.makeQualified(srcPath.getFileSystem(conf))));
+                srcPath.makeQualified(fs.getUri(), fs.getWorkingDirectory())));
           } else {
             assertTrue(p.getName().equals("policy2"));
             Path srcPath = new Path("/user/dhruba/raidtest2");
+            FileSystem fs = srcPath.getFileSystem(conf);
             assertTrue(p.getSrcPath().equals(
-                srcPath.makeQualified(srcPath.getFileSystem(conf))));
+                srcPath.makeQualified(fs.getUri(), fs.getWorkingDirectory())));
           }
           assertEquals(targetReplication,
                        Integer.parseInt(p.getProperty("targetReplication")));
@@ -542,14 +540,14 @@ public class TestRaidNode extends TestCa
              System.currentTimeMillis() - start < MAX_WAITTIME) {
         Thread.sleep(1000);
       }
-      this.assertEquals(dcnode.jobMonitor.jobsMonitored(), 2);
+      assertEquals(dcnode.jobMonitor.jobsMonitored(), 2);
 
       start = System.currentTimeMillis();
       while (dcnode.jobMonitor.jobsSucceeded() < 2 &&
              System.currentTimeMillis() - start < MAX_WAITTIME) {
         Thread.sleep(1000);
       }
-      this.assertEquals(dcnode.jobMonitor.jobsSucceeded(), 2);
+      assertEquals(dcnode.jobMonitor.jobsSucceeded(), 2);
 
       LOG.info("Test testDistRaid successful.");
       
@@ -643,7 +641,8 @@ public class TestRaidNode extends TestCa
 
     createClusters(false);
     ConfigBuilder cb = new ConfigBuilder();
-    cb.addPolicy("policy1", "/user/dhruba/raidtest", (short)1, targetReplication, metaReplication, stripeLength);
+    cb.addPolicy("policy1", "/user/dhruba/raidtest",
+        srcReplication, targetReplication, metaReplication, stripeLength);
     cb.persist();
 
     RaidNode cnode = null;
@@ -680,8 +679,8 @@ public class TestRaidNode extends TestCa
              System.currentTimeMillis() - start < MAX_WAITTIME) {
         Thread.sleep(1000);
       }
-      this.assertEquals(dcnode.jobMonitor.jobsMonitored(), numJobsExpected);
-      this.assertEquals(dcnode.jobMonitor.jobsSucceeded(), numJobsExpected);
+      assertEquals(dcnode.jobMonitor.jobsMonitored(), numJobsExpected);
+      assertEquals(dcnode.jobMonitor.jobsSucceeded(), numJobsExpected);
 
       LOG.info("Test testSuspendTraversal successful.");
 
@@ -695,11 +694,12 @@ public class TestRaidNode extends TestCa
     LOG.info("Test testSuspendTraversal completed.");
   }
 
-  public void testSchedulerOption() {
+  public void testSchedulerOption() throws IOException {
     Configuration conf = new Configuration();
     conf.set("raid.scheduleroption",
       "mapred.fairscheduler.pool:dummy,foo:bar");
-    org.apache.hadoop.mapred.JobConf jobConf = DistRaid.createJobConf(conf);
+    org.apache.hadoop.mapreduce.Job job = DistRaid.createJob(conf);
+    Configuration jobConf = job.getConfiguration();
     assertEquals("dummy", jobConf.get("mapred.fairscheduler.pool"));
     assertEquals("bar", jobConf.get("foo"));
   }