You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2010/07/02 00:17:37 UTC

svn commit: r959795 - in /hadoop/mapreduce/trunk: ./ src/test/mapred/org/apache/hadoop/fs/slive/ src/test/mapred/org/apache/hadoop/test/

Author: shv
Date: Thu Jul  1 22:17:37 2010
New Revision: 959795

URL: http://svn.apache.org/viewvc?rev=959795&view=rev
Log:
MAPREDUCE-1893. Slive with multiple reducers. Contributed by Konstantin Shvachko.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SlivePartitioner.java   (with props)
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationOutput.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/PathFinder.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveTest.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/TestSlive.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jul  1 22:17:37 2010
@@ -70,6 +70,8 @@ Trunk (unreleased changes)
     MAPREDUCE-1850. Includes job submit host information (name and ip) in
     jobconf and jobdetails display (Krishna Ramachandran via amareshwari)
 
+    MAPREDUCE-1893. Slive with multiple reducers. (shv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java Thu Jul  1 22:17:37 2010
@@ -138,6 +138,7 @@ class ArgumentParser {
   private Options getOptions() {
     Options cliopt = new Options();
     cliopt.addOption(ConfigOption.MAPS);
+    cliopt.addOption(ConfigOption.REDUCES);
     cliopt.addOption(ConfigOption.PACKET_SIZE);
     cliopt.addOption(ConfigOption.OPS);
     cliopt.addOption(ConfigOption.DURATION);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java Thu Jul  1 22:17:37 2010
@@ -134,7 +134,8 @@ class ConfigExtractor {
    * @return the number of reducers to use
    */
   Integer getReducerAmount() {
-    return 1;
+    // should be slive.reduces
+    return getInteger(null, ConfigOption.REDUCES);
   }
 
   /**
@@ -670,6 +671,7 @@ class ConfigExtractor {
       LOG.info(duration);
     }
     LOG.info("Map amount = " + cfg.getMapAmount());
+    LOG.info("Reducer amount = " + cfg.getReducerAmount());
     LOG.info("Operation amount = " + cfg.getOpCount());
     LOG.info("Total file limit = " + cfg.getTotalFiles());
     LOG.info("Total dir file limit = " + cfg.getDirSize());

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java Thu Jul  1 22:17:37 2010
@@ -218,6 +218,24 @@ class ConfigMerger {
         base.set(ConfigOption.MAPS.getCfgOption(), mapAmount.toString());
       }
     }
+    // overwrite the reducer amount and check to ensure > 0
+    {
+      Integer reduceAmount = null;
+      try {
+        reduceAmount = extractor.getMapAmount(opts.getValue(ConfigOption.REDUCES
+            .getOpt()));
+      } catch (Exception e) {
+        throw new ConfigException(
+            "Error extracting & merging reducer amount", e);
+      }
+      if (reduceAmount != null) {
+        if (reduceAmount <= 0) {
+          throw new ConfigException(
+              "Reducer amount can not be less than or equal to zero");
+        }
+        base.set(ConfigOption.REDUCES.getCfgOption(), reduceAmount.toString());
+      }
+    }
     // overwrite the duration amount and ensure > 0
     {
       Integer duration = null;

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java Thu Jul  1 22:17:37 2010
@@ -34,7 +34,10 @@ class ConfigOption<T> extends Option {
 
   // command line options and descriptions and config option name
   static final ConfigOption<Integer> MAPS = new ConfigOption<Integer>(
-      "maps", true, "Number of maps", SLIVE_PREFIX + " .maps", 10);
+      "maps", true, "Number of maps", SLIVE_PREFIX + ".maps", 10);
+
+  static final ConfigOption<Integer> REDUCES = new ConfigOption<Integer>(
+      "reduces", true, "Number of reduces", SLIVE_PREFIX + ".reduces", 1);
 
   static final ConfigOption<Integer> OPS = new ConfigOption<Integer>(
       "ops", true, "Max number of operations per map", SLIVE_PREFIX

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java Thu Jul  1 22:17:37 2010
@@ -51,7 +51,7 @@ class Constants {
 
   // program info
   static final String PROG_NAME = SliveTest.class.getSimpleName();
-  static final String PROG_VERSION = "0.0.1";
+  static final String PROG_VERSION = "0.0.2";
 
   // useful constants
   static final int MEGABYTES = 1048576;

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationOutput.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationOutput.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationOutput.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationOutput.java Thu Jul  1 22:17:37 2010
@@ -87,13 +87,6 @@ class OperationOutput {
     this(key.toString(), value);
   }
 
-  OperationOutput(OperationOutput r1) {
-    this.dataType = r1.dataType;
-    this.measurementType = r1.measurementType;
-    this.opType = r1.opType;
-    this.value = r1.value;
-  }
-
   public String toString() {
     return getKeyString() + " (" + this.value + ")";
   }
@@ -230,8 +223,6 @@ class OperationOutput {
 
   /**
    * Gets the output data type of this class.
-   * 
-   * @return
    */
   OutputType getOutputType() {
     return dataType;

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/PathFinder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/PathFinder.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/PathFinder.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/PathFinder.java Thu Jul  1 22:17:37 2010
@@ -59,7 +59,7 @@ class PathFinder {
    *          operations to calculate the file name and path tree
    * @param type
    *          directory or file enumeration
-   * @return
+   * @return Path
    */
   private Path getPath(int curId, int limitPerDir, Type type) {
     if (curId <= 0) {

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SlivePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SlivePartitioner.java?rev=959795&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SlivePartitioner.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SlivePartitioner.java Thu Jul  1 22:17:37 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.slive;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+
+/**
+ * The partitioner partitions the map output according to the operation type.
+ * The partition number is the hash of the operation type modular the total
+ * number of the reducers.
+ */
+@SuppressWarnings("deprecation")
+public class SlivePartitioner implements Partitioner<Text, Text> {
+  @Override // JobConfigurable
+  public void configure(JobConf conf) {}
+
+  @Override // Partitioner
+  public int getPartition(Text key, Text value, int numPartitions) {
+    OperationOutput oo = new OperationOutput(key, value);
+    return oo.getOperationType().hashCode() % numPartitions;
+  }
+}

Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SlivePartitioner.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveTest.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveTest.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveTest.java Thu Jul  1 22:17:37 2010
@@ -33,6 +33,7 @@ import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.slive.ArgumentParser.ParsedOutput;
@@ -41,6 +42,8 @@ import org.apache.hadoop.mapred.FileOutp
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * Slive test entry point + main program
@@ -51,9 +54,12 @@ import org.apache.hadoop.mapred.TextOutp
  * command line and process them (and merge) and then establish a job which will
  * thereafter run a set of mappers & reducers and then the output of the
  * reduction will be reported on.
+ * 
+ * The number of maps is specified by "slive.maps".
+ * The number of reduces is specified by "slive.reduces".
  */
 @SuppressWarnings("deprecation")
-public class SliveTest {
+public class SliveTest implements Tool {
 
   private static final Log LOG = LogFactory.getLog(SliveTest.class);
 
@@ -63,20 +69,13 @@ public class SliveTest {
     Configuration.addDefaultResource("hdfs-site.xml");
   }
 
-  private String[] args;
   private Configuration base;
 
   public SliveTest(Configuration base) {
-    this.args = null;
-    this.base = base;
-  }
-
-  public SliveTest(String[] args, Configuration base) {
-    this.args = args;
     this.base = base;
   }
 
-  public int run() {
+  public int run(String[] args) {
     ParsedOutput parsedOpts = null;
     try {
       ArgumentParser argHolder = new ArgumentParser(args);
@@ -86,21 +85,20 @@ public class SliveTest {
         return 1;
       }
     } catch (Exception e) {
-      LOG.error("Unable to parse arguments due to error: " + e.getMessage());
-      e.printStackTrace();
+      LOG.error("Unable to parse arguments due to error: ", e);
       return 1;
     }
     LOG.info("Running with option list " + Helper.stringifyArray(args, " "));
     ConfigExtractor config = null;
     try {
       ConfigMerger cfgMerger = new ConfigMerger();
-      Configuration cfg = cfgMerger.getMerged(parsedOpts, getBaseConfig());
+      Configuration cfg = cfgMerger.getMerged(parsedOpts,
+                                              new Configuration(base));
       if (cfg != null) {
         config = new ConfigExtractor(cfg);
       }
     } catch (Exception e) {
-      LOG.error("Unable to merge config due to error: " + e.getMessage());
-      e.printStackTrace();
+      LOG.error("Unable to merge config due to error: ", e);
       return 1;
     }
     if (config == null) {
@@ -111,8 +109,7 @@ public class SliveTest {
       LOG.info("Options are:");
       ConfigExtractor.dumpOptions(config);
     } catch (Exception e) {
-      LOG.error("Unable to dump options due to error: " + e.getMessage());
-      e.printStackTrace();
+      LOG.error("Unable to dump options due to error: ", e);
       return 1;
     }
     boolean jobOk = false;
@@ -121,16 +118,14 @@ public class SliveTest {
       runJob(config);
       jobOk = true;
     } catch (Exception e) {
-      LOG.error("Unable to run job due to error: " + e.getMessage());
-      e.printStackTrace();
+      LOG.error("Unable to run job due to error: ", e);
     }
     if (jobOk) {
       try {
         LOG.info("Reporting on job:");
         writeReport(config);
       } catch (Exception e) {
-        LOG.error("Unable to report on job due to error: " + e.getMessage());
-        e.printStackTrace();
+        LOG.error("Unable to report on job due to error: ", e);
       }
     }
     // attempt cleanup (not critical)
@@ -141,8 +136,7 @@ public class SliveTest {
         LOG.info("Cleaning up job:");
         cleanup(config);
       } catch (Exception e) {
-        LOG.error("Unable to cleanup job due to error: " + e.getMessage());
-        e.printStackTrace();
+        LOG.error("Unable to cleanup job due to error: ", e);
       }
     }
     // all mostly worked
@@ -187,6 +181,7 @@ public class SliveTest {
     job.setInputFormat(DummyInputFormat.class);
     FileOutputFormat.setOutputPath(job, config.getOutputPath());
     job.setMapperClass(SliveMapper.class);
+    job.setPartitionerClass(SlivePartitioner.class);
     job.setReducerClass(SliveReducer.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
@@ -224,36 +219,40 @@ public class SliveTest {
    *           if files can not be opened/closed/read or invalid format
    */
   private void writeReport(ConfigExtractor cfg) throws Exception {
-    Path fn = new Path(cfg.getOutputPath(), String.format(
-        Constants.REDUCER_FILE, "00000"));
-    LOG.info("Writing report using contents of " + fn);
+    Path dn = cfg.getOutputPath();
+    LOG.info("Writing report using contents of " + dn);
     FileSystem fs = FileSystem.get(cfg.getConfig());
+    FileStatus[] reduceFiles = fs.listStatus(dn);
     BufferedReader fileReader = null;
     PrintWriter reportWriter = null;
     try {
-      fileReader = new BufferedReader(new InputStreamReader(
-          new DataInputStream(fs.open(fn))));
-      String line;
-      Map<String, List<OperationOutput>> splitTypes = new TreeMap<String, List<OperationOutput>>();
       List<OperationOutput> noOperations = new ArrayList<OperationOutput>();
-      while ((line = fileReader.readLine()) != null) {
-        String pieces[] = line.split("\t", 2);
-        if (pieces.length == 2) {
-          OperationOutput data = new OperationOutput(pieces[0], pieces[1]);
-          String op = (data.getOperationType());
-          if (op != null) {
-            List<OperationOutput> opList = splitTypes.get(op);
-            if (opList == null) {
-              opList = new ArrayList<OperationOutput>();
+      Map<String, List<OperationOutput>> splitTypes = new TreeMap<String, List<OperationOutput>>();
+      for(FileStatus fn : reduceFiles) {
+        fileReader = new BufferedReader(new InputStreamReader(
+            new DataInputStream(fs.open(fn.getPath()))));
+        String line;
+        while ((line = fileReader.readLine()) != null) {
+          String pieces[] = line.split("\t", 2);
+          if (pieces.length == 2) {
+            OperationOutput data = new OperationOutput(pieces[0], pieces[1]);
+            String op = (data.getOperationType());
+            if (op != null) {
+              List<OperationOutput> opList = splitTypes.get(op);
+              if (opList == null) {
+                opList = new ArrayList<OperationOutput>();
+              }
+              opList.add(data);
+              splitTypes.put(op, opList);
+            } else {
+              noOperations.add(data);
             }
-            opList.add(data);
-            splitTypes.put(op, opList);
           } else {
-            noOperations.add(data);
+            throw new IOException("Unparseable line " + line);
           }
-        } else {
-          throw new IOException("Unparseable line " + line);
         }
+        fileReader.close();
+        fileReader = null;
       }
       File resFile = null;
       if (cfg.getResultFile() != null) {
@@ -284,17 +283,6 @@ public class SliveTest {
   }
 
   /**
-   * Gets the base configuration to use for a "starting" configuration to be
-   * merged with.
-   * 
-   * @return Configuration starting configuration.
-   */
-  private Configuration getBaseConfig() {
-    // ensure a copy is made
-    return new Configuration(base);
-  }
-
-  /**
    * Cleans up the base directory by removing it
    * 
    * @param cfg
@@ -320,10 +308,20 @@ public class SliveTest {
    * @param args
    *          command line options
    */
-  public static void main(String[] args) {
+  public static void main(String[] args) throws Exception {
     Configuration startCfg = new Configuration(true);
-    SliveTest runner = new SliveTest(args, startCfg);
-    int ec = runner.run();
+    SliveTest runner = new SliveTest(startCfg);
+    int ec = ToolRunner.run(runner, args);
     System.exit(ec);
   }
+
+  @Override // Configurable
+  public Configuration getConf() {
+    return this.base;
+  }
+
+  @Override // Configurable
+  public void setConf(Configuration conf) {
+    this.base = conf;
+  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/TestSlive.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/TestSlive.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/TestSlive.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/TestSlive.java Thu Jul  1 22:17:37 2010
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.slive.Argume
 import org.apache.hadoop.fs.slive.Constants.OperationType;
 import org.apache.hadoop.fs.slive.DataVerifier.VerifyOutput;
 import org.apache.hadoop.fs.slive.DataWriter.GenerateOutput;
+import org.apache.hadoop.util.ToolRunner;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -55,43 +56,43 @@ public class TestSlive {
   private static final String TEST_DATA_PROP = "test.build.data";
 
   private static Configuration getBaseConfig() {
-    return new Configuration();
+    Configuration conf = new Configuration();
+    return conf;
   }
 
-  // gets the test write location according to the coding guidelines
+  /** gets the test write location according to the coding guidelines */
   private static File getWriteLoc() {
-    String writeLoc = System.getProperty(TEST_DATA_PROP);
-    if (writeLoc == null || writeLoc.isEmpty()) {
-      throw new RuntimeException("No " + TEST_DATA_PROP
-          + " system property specified");
-    }
+    String writeLoc = System.getProperty(TEST_DATA_PROP, "build/test/data/");
     return new File(writeLoc, "slive");
   }
 
-  // gets where the MR job places its data + output + results
+  /** gets where the MR job places its data + output + results */
   private static File getFlowLocation() {
     return new File(getWriteLoc(), "flow");
   }
 
-  // gets the test directory which is created
-  // by the mkdir op
+  /** gets the test directory which is created by the mkdir op */
   private static File getTestDir() {
     return new File(getWriteLoc(), "slivedir");
   }
 
-  // gets the test file location
-  // which is used for reading, appending and created
+  /**
+   * gets the test file location
+   * which is used for reading, appending and created
+   */
   private static File getTestFile() {
     return new File(getWriteLoc(), "slivefile");
   }
 
-  // gets the rename file which is used in combination
-  // with the test file to do a rename operation
+  /**
+   * gets the rename file which is used in combination
+   * with the test file to do a rename operation
+   */
   private static File getTestRenameFile() {
     return new File(getWriteLoc(), "slivefile1");
   }
 
-  // gets the MR result file name
+  /** gets the MR result file name */
   private static File getResultFile() {
     return new File(getWriteLoc(), "sliveresfile");
   }
@@ -100,8 +101,7 @@ public class TestSlive {
     return new File(getWriteLoc(), "slivenofile");
   }
 
-  // gets the test program arguments
-  // used for merging and main MR running
+  /** gets the test program arguments used for merging and main MR running */
   private String[] getTestArgs(boolean sleep) {
     List<String> args = new LinkedList<String>();
     // setup the options
@@ -111,7 +111,9 @@ public class TestSlive {
       args.add("-" + ConfigOption.OPS.getOpt());
       args.add(Constants.OperationType.values().length + "");
       args.add("-" + ConfigOption.MAPS.getOpt());
-      args.add("1");
+      args.add("2");
+      args.add("-" + ConfigOption.REDUCES.getOpt());
+      args.add("2");
       args.add("-" + ConfigOption.APPEND_SIZE.getOpt());
       args.add("1M,2M");
       args.add("-" + ConfigOption.BLOCK_SIZE.getOpt());
@@ -197,8 +199,7 @@ public class TestSlive {
     rDelete(getImaginaryFile());
   }
 
-  // cleans up a file or directory
-  // recursively if needbe
+  /** cleans up a file or directory recursively if need be */
   private void rDelete(File place) throws Exception {
     if (place.isFile()) {
       LOG.info("Deleting file " + place);
@@ -208,7 +209,7 @@ public class TestSlive {
     }
   }
 
-  // deletes a dir and its contents
+  /** deletes a dir and its contents */
   private void deleteDir(File dir) throws Exception {
     String fns[] = dir.list();
     // delete contents first
@@ -226,7 +227,8 @@ public class TestSlive {
     ConfigExtractor extractor = getTestConfig(true);
     assertEquals(extractor.getOpCount().intValue(), Constants.OperationType
         .values().length);
-    assertEquals(extractor.getMapAmount().intValue(), 1);
+    assertEquals(extractor.getMapAmount().intValue(), 2);
+    assertEquals(extractor.getReducerAmount().intValue(), 2);
     Range<Long> apRange = extractor.getAppendSize();
     assertEquals(apRange.getLower().intValue(), Constants.MEGABYTES * 1);
     assertEquals(apRange.getUpper().intValue(), Constants.MEGABYTES * 2);
@@ -402,8 +404,8 @@ public class TestSlive {
   @Test
   public void testMRFlow() throws Exception {
     ConfigExtractor extractor = getTestConfig(false);
-    SliveTest s = new SliveTest(getTestArgs(false), getBaseConfig());
-    int ec = s.run();
+    SliveTest s = new SliveTest(getBaseConfig());
+    int ec = ToolRunner.run(s, getTestArgs(false));
     assertTrue(ec == 0);
     String resFile = extractor.getResultFile();
     File fn = new File(resFile);
@@ -470,11 +472,14 @@ public class TestSlive {
     // attempt to read it
     DataVerifier vf = new DataVerifier();
     VerifyOutput vout = new VerifyOutput(0, 0, 0, 0);
+    DataInputStream in = null;
     try {
-      vout = vf
-          .verifyFile(byteAm, new DataInputStream(new FileInputStream(fn)));
+      in = new DataInputStream(new FileInputStream(fn));
+      vout = vf.verifyFile(byteAm, in);
     } catch (Exception e) {
 
+    } finally {
+      if(in != null) in.close();
     }
     assertTrue(vout.getChunksSame() == 0);
   }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java Thu Jul  1 22:17:37 2010
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.DFSCIOTest;
 import org.apache.hadoop.fs.DistributedFSCheck;
 import org.apache.hadoop.io.FileBench;
 import org.apache.hadoop.fs.JHLogAnalyzer;
+import org.apache.hadoop.fs.slive.SliveTest;
 
 /**
  * Driver for Map-reduce tests.
@@ -86,7 +87,7 @@ public class MapredTestDriver {
           "A benchmark that stresses the namenode.");
       pgd.addClass("testfilesystem", TestFileSystem.class, 
           "A test for FileSystem read/write.");
-      pgd.addClass("TestDFSIO", TestDFSIO.class, 
+      pgd.addClass(TestDFSIO.class.getSimpleName(), TestDFSIO.class, 
           "Distributed i/o benchmark.");
       pgd.addClass("DFSCIOTest", DFSCIOTest.class, "" +
           "Distributed i/o benchmark of libhdfs.");
@@ -98,6 +99,8 @@ public class MapredTestDriver {
           "Text(Input|Output)Format (compressed and uncompressed)");
       pgd.addClass(JHLogAnalyzer.class.getSimpleName(), JHLogAnalyzer.class, 
           "Job History Log analyzer.");
+      pgd.addClass(SliveTest.class.getSimpleName(), SliveTest.class, 
+          "HDFS Stress Test and Live Data Verification.");
     } catch(Throwable e) {
       e.printStackTrace();
     }