You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2010/07/02 00:17:37 UTC
svn commit: r959795 - in /hadoop/mapreduce/trunk: ./
src/test/mapred/org/apache/hadoop/fs/slive/
src/test/mapred/org/apache/hadoop/test/
Author: shv
Date: Thu Jul 1 22:17:37 2010
New Revision: 959795
URL: http://svn.apache.org/viewvc?rev=959795&view=rev
Log:
MAPREDUCE-1893. Slive with multiple reducers. Contributed by Konstantin Shvachko.
Added:
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SlivePartitioner.java (with props)
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationOutput.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/PathFinder.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveTest.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/TestSlive.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jul 1 22:17:37 2010
@@ -70,6 +70,8 @@ Trunk (unreleased changes)
MAPREDUCE-1850. Includes job submit host information (name and ip) in
jobconf and jobdetails display (Krishna Ramachandran via amareshwari)
+ MAPREDUCE-1893. Slive with multiple reducers. (shv)
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java Thu Jul 1 22:17:37 2010
@@ -138,6 +138,7 @@ class ArgumentParser {
private Options getOptions() {
Options cliopt = new Options();
cliopt.addOption(ConfigOption.MAPS);
+ cliopt.addOption(ConfigOption.REDUCES);
cliopt.addOption(ConfigOption.PACKET_SIZE);
cliopt.addOption(ConfigOption.OPS);
cliopt.addOption(ConfigOption.DURATION);
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java Thu Jul 1 22:17:37 2010
@@ -134,7 +134,8 @@ class ConfigExtractor {
* @return the number of reducers to use
*/
Integer getReducerAmount() {
- return 1;
+ // should be slive.reduces
+ return getInteger(null, ConfigOption.REDUCES);
}
/**
@@ -670,6 +671,7 @@ class ConfigExtractor {
LOG.info(duration);
}
LOG.info("Map amount = " + cfg.getMapAmount());
+ LOG.info("Reducer amount = " + cfg.getReducerAmount());
LOG.info("Operation amount = " + cfg.getOpCount());
LOG.info("Total file limit = " + cfg.getTotalFiles());
LOG.info("Total dir file limit = " + cfg.getDirSize());
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java Thu Jul 1 22:17:37 2010
@@ -218,6 +218,24 @@ class ConfigMerger {
base.set(ConfigOption.MAPS.getCfgOption(), mapAmount.toString());
}
}
+ // overwrite the reducer amount and check to ensure > 0
+ {
+ Integer reduceAmount = null;
+ try {
+ reduceAmount = extractor.getMapAmount(opts.getValue(ConfigOption.REDUCES
+ .getOpt()));
+ } catch (Exception e) {
+ throw new ConfigException(
+ "Error extracting & merging reducer amount", e);
+ }
+ if (reduceAmount != null) {
+ if (reduceAmount <= 0) {
+ throw new ConfigException(
+ "Reducer amount can not be less than or equal to zero");
+ }
+ base.set(ConfigOption.REDUCES.getCfgOption(), reduceAmount.toString());
+ }
+ }
// overwrite the duration amount and ensure > 0
{
Integer duration = null;
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java Thu Jul 1 22:17:37 2010
@@ -34,7 +34,10 @@ class ConfigOption<T> extends Option {
// command line options and descriptions and config option name
static final ConfigOption<Integer> MAPS = new ConfigOption<Integer>(
- "maps", true, "Number of maps", SLIVE_PREFIX + " .maps", 10);
+ "maps", true, "Number of maps", SLIVE_PREFIX + ".maps", 10);
+
+ static final ConfigOption<Integer> REDUCES = new ConfigOption<Integer>(
+ "reduces", true, "Number of reduces", SLIVE_PREFIX + ".reduces", 1);
static final ConfigOption<Integer> OPS = new ConfigOption<Integer>(
"ops", true, "Max number of operations per map", SLIVE_PREFIX
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java Thu Jul 1 22:17:37 2010
@@ -51,7 +51,7 @@ class Constants {
// program info
static final String PROG_NAME = SliveTest.class.getSimpleName();
- static final String PROG_VERSION = "0.0.1";
+ static final String PROG_VERSION = "0.0.2";
// useful constants
static final int MEGABYTES = 1048576;
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationOutput.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationOutput.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationOutput.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationOutput.java Thu Jul 1 22:17:37 2010
@@ -87,13 +87,6 @@ class OperationOutput {
this(key.toString(), value);
}
- OperationOutput(OperationOutput r1) {
- this.dataType = r1.dataType;
- this.measurementType = r1.measurementType;
- this.opType = r1.opType;
- this.value = r1.value;
- }
-
public String toString() {
return getKeyString() + " (" + this.value + ")";
}
@@ -230,8 +223,6 @@ class OperationOutput {
/**
* Gets the output data type of this class.
- *
- * @return
*/
OutputType getOutputType() {
return dataType;
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/PathFinder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/PathFinder.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/PathFinder.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/PathFinder.java Thu Jul 1 22:17:37 2010
@@ -59,7 +59,7 @@ class PathFinder {
* operations to calculate the file name and path tree
* @param type
* directory or file enumeration
- * @return
+ * @return Path
*/
private Path getPath(int curId, int limitPerDir, Type type) {
if (curId <= 0) {
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SlivePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SlivePartitioner.java?rev=959795&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SlivePartitioner.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SlivePartitioner.java Thu Jul 1 22:17:37 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.slive;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+
+/**
+ * The partitioner partitions the map output according to the operation type.
+ * The partition number is the hash of the operation type modular the total
+ * number of the reducers.
+ */
+@SuppressWarnings("deprecation")
+public class SlivePartitioner implements Partitioner<Text, Text> {
+ @Override // JobConfigurable
+ public void configure(JobConf conf) {}
+
+ @Override // Partitioner
+ public int getPartition(Text key, Text value, int numPartitions) {
+ OperationOutput oo = new OperationOutput(key, value);
+ return oo.getOperationType().hashCode() % numPartitions;
+ }
+}
Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SlivePartitioner.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveTest.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveTest.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveTest.java Thu Jul 1 22:17:37 2010
@@ -33,6 +33,7 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.slive.ArgumentParser.ParsedOutput;
@@ -41,6 +42,8 @@ import org.apache.hadoop.mapred.FileOutp
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
/**
* Slive test entry point + main program
@@ -51,9 +54,12 @@ import org.apache.hadoop.mapred.TextOutp
* command line and process them (and merge) and then establish a job which will
* thereafter run a set of mappers & reducers and then the output of the
* reduction will be reported on.
+ *
+ * The number of maps is specified by "slive.maps".
+ * The number of reduces is specified by "slive.reduces".
*/
@SuppressWarnings("deprecation")
-public class SliveTest {
+public class SliveTest implements Tool {
private static final Log LOG = LogFactory.getLog(SliveTest.class);
@@ -63,20 +69,13 @@ public class SliveTest {
Configuration.addDefaultResource("hdfs-site.xml");
}
- private String[] args;
private Configuration base;
public SliveTest(Configuration base) {
- this.args = null;
- this.base = base;
- }
-
- public SliveTest(String[] args, Configuration base) {
- this.args = args;
this.base = base;
}
- public int run() {
+ public int run(String[] args) {
ParsedOutput parsedOpts = null;
try {
ArgumentParser argHolder = new ArgumentParser(args);
@@ -86,21 +85,20 @@ public class SliveTest {
return 1;
}
} catch (Exception e) {
- LOG.error("Unable to parse arguments due to error: " + e.getMessage());
- e.printStackTrace();
+ LOG.error("Unable to parse arguments due to error: ", e);
return 1;
}
LOG.info("Running with option list " + Helper.stringifyArray(args, " "));
ConfigExtractor config = null;
try {
ConfigMerger cfgMerger = new ConfigMerger();
- Configuration cfg = cfgMerger.getMerged(parsedOpts, getBaseConfig());
+ Configuration cfg = cfgMerger.getMerged(parsedOpts,
+ new Configuration(base));
if (cfg != null) {
config = new ConfigExtractor(cfg);
}
} catch (Exception e) {
- LOG.error("Unable to merge config due to error: " + e.getMessage());
- e.printStackTrace();
+ LOG.error("Unable to merge config due to error: ", e);
return 1;
}
if (config == null) {
@@ -111,8 +109,7 @@ public class SliveTest {
LOG.info("Options are:");
ConfigExtractor.dumpOptions(config);
} catch (Exception e) {
- LOG.error("Unable to dump options due to error: " + e.getMessage());
- e.printStackTrace();
+ LOG.error("Unable to dump options due to error: ", e);
return 1;
}
boolean jobOk = false;
@@ -121,16 +118,14 @@ public class SliveTest {
runJob(config);
jobOk = true;
} catch (Exception e) {
- LOG.error("Unable to run job due to error: " + e.getMessage());
- e.printStackTrace();
+ LOG.error("Unable to run job due to error: ", e);
}
if (jobOk) {
try {
LOG.info("Reporting on job:");
writeReport(config);
} catch (Exception e) {
- LOG.error("Unable to report on job due to error: " + e.getMessage());
- e.printStackTrace();
+ LOG.error("Unable to report on job due to error: ", e);
}
}
// attempt cleanup (not critical)
@@ -141,8 +136,7 @@ public class SliveTest {
LOG.info("Cleaning up job:");
cleanup(config);
} catch (Exception e) {
- LOG.error("Unable to cleanup job due to error: " + e.getMessage());
- e.printStackTrace();
+ LOG.error("Unable to cleanup job due to error: ", e);
}
}
// all mostly worked
@@ -187,6 +181,7 @@ public class SliveTest {
job.setInputFormat(DummyInputFormat.class);
FileOutputFormat.setOutputPath(job, config.getOutputPath());
job.setMapperClass(SliveMapper.class);
+ job.setPartitionerClass(SlivePartitioner.class);
job.setReducerClass(SliveReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
@@ -224,36 +219,40 @@ public class SliveTest {
* if files can not be opened/closed/read or invalid format
*/
private void writeReport(ConfigExtractor cfg) throws Exception {
- Path fn = new Path(cfg.getOutputPath(), String.format(
- Constants.REDUCER_FILE, "00000"));
- LOG.info("Writing report using contents of " + fn);
+ Path dn = cfg.getOutputPath();
+ LOG.info("Writing report using contents of " + dn);
FileSystem fs = FileSystem.get(cfg.getConfig());
+ FileStatus[] reduceFiles = fs.listStatus(dn);
BufferedReader fileReader = null;
PrintWriter reportWriter = null;
try {
- fileReader = new BufferedReader(new InputStreamReader(
- new DataInputStream(fs.open(fn))));
- String line;
- Map<String, List<OperationOutput>> splitTypes = new TreeMap<String, List<OperationOutput>>();
List<OperationOutput> noOperations = new ArrayList<OperationOutput>();
- while ((line = fileReader.readLine()) != null) {
- String pieces[] = line.split("\t", 2);
- if (pieces.length == 2) {
- OperationOutput data = new OperationOutput(pieces[0], pieces[1]);
- String op = (data.getOperationType());
- if (op != null) {
- List<OperationOutput> opList = splitTypes.get(op);
- if (opList == null) {
- opList = new ArrayList<OperationOutput>();
+ Map<String, List<OperationOutput>> splitTypes = new TreeMap<String, List<OperationOutput>>();
+ for(FileStatus fn : reduceFiles) {
+ fileReader = new BufferedReader(new InputStreamReader(
+ new DataInputStream(fs.open(fn.getPath()))));
+ String line;
+ while ((line = fileReader.readLine()) != null) {
+ String pieces[] = line.split("\t", 2);
+ if (pieces.length == 2) {
+ OperationOutput data = new OperationOutput(pieces[0], pieces[1]);
+ String op = (data.getOperationType());
+ if (op != null) {
+ List<OperationOutput> opList = splitTypes.get(op);
+ if (opList == null) {
+ opList = new ArrayList<OperationOutput>();
+ }
+ opList.add(data);
+ splitTypes.put(op, opList);
+ } else {
+ noOperations.add(data);
}
- opList.add(data);
- splitTypes.put(op, opList);
} else {
- noOperations.add(data);
+ throw new IOException("Unparseable line " + line);
}
- } else {
- throw new IOException("Unparseable line " + line);
}
+ fileReader.close();
+ fileReader = null;
}
File resFile = null;
if (cfg.getResultFile() != null) {
@@ -284,17 +283,6 @@ public class SliveTest {
}
/**
- * Gets the base configuration to use for a "starting" configuration to be
- * merged with.
- *
- * @return Configuration starting configuration.
- */
- private Configuration getBaseConfig() {
- // ensure a copy is made
- return new Configuration(base);
- }
-
- /**
* Cleans up the base directory by removing it
*
* @param cfg
@@ -320,10 +308,20 @@ public class SliveTest {
* @param args
* command line options
*/
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
Configuration startCfg = new Configuration(true);
- SliveTest runner = new SliveTest(args, startCfg);
- int ec = runner.run();
+ SliveTest runner = new SliveTest(startCfg);
+ int ec = ToolRunner.run(runner, args);
System.exit(ec);
}
+
+ @Override // Configurable
+ public Configuration getConf() {
+ return this.base;
+ }
+
+ @Override // Configurable
+ public void setConf(Configuration conf) {
+ this.base = conf;
+ }
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/TestSlive.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/TestSlive.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/TestSlive.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/TestSlive.java Thu Jul 1 22:17:37 2010
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.slive.Argume
import org.apache.hadoop.fs.slive.Constants.OperationType;
import org.apache.hadoop.fs.slive.DataVerifier.VerifyOutput;
import org.apache.hadoop.fs.slive.DataWriter.GenerateOutput;
+import org.apache.hadoop.util.ToolRunner;
import org.junit.Before;
import org.junit.Test;
@@ -55,43 +56,43 @@ public class TestSlive {
private static final String TEST_DATA_PROP = "test.build.data";
private static Configuration getBaseConfig() {
- return new Configuration();
+ Configuration conf = new Configuration();
+ return conf;
}
- // gets the test write location according to the coding guidelines
+ /** gets the test write location according to the coding guidelines */
private static File getWriteLoc() {
- String writeLoc = System.getProperty(TEST_DATA_PROP);
- if (writeLoc == null || writeLoc.isEmpty()) {
- throw new RuntimeException("No " + TEST_DATA_PROP
- + " system property specified");
- }
+ String writeLoc = System.getProperty(TEST_DATA_PROP, "build/test/data/");
return new File(writeLoc, "slive");
}
- // gets where the MR job places its data + output + results
+ /** gets where the MR job places its data + output + results */
private static File getFlowLocation() {
return new File(getWriteLoc(), "flow");
}
- // gets the test directory which is created
- // by the mkdir op
+ /** gets the test directory which is created by the mkdir op */
private static File getTestDir() {
return new File(getWriteLoc(), "slivedir");
}
- // gets the test file location
- // which is used for reading, appending and created
+ /**
+ * gets the test file location
+ * which is used for reading, appending and created
+ */
private static File getTestFile() {
return new File(getWriteLoc(), "slivefile");
}
- // gets the rename file which is used in combination
- // with the test file to do a rename operation
+ /**
+ * gets the rename file which is used in combination
+ * with the test file to do a rename operation
+ */
private static File getTestRenameFile() {
return new File(getWriteLoc(), "slivefile1");
}
- // gets the MR result file name
+ /** gets the MR result file name */
private static File getResultFile() {
return new File(getWriteLoc(), "sliveresfile");
}
@@ -100,8 +101,7 @@ public class TestSlive {
return new File(getWriteLoc(), "slivenofile");
}
- // gets the test program arguments
- // used for merging and main MR running
+ /** gets the test program arguments used for merging and main MR running */
private String[] getTestArgs(boolean sleep) {
List<String> args = new LinkedList<String>();
// setup the options
@@ -111,7 +111,9 @@ public class TestSlive {
args.add("-" + ConfigOption.OPS.getOpt());
args.add(Constants.OperationType.values().length + "");
args.add("-" + ConfigOption.MAPS.getOpt());
- args.add("1");
+ args.add("2");
+ args.add("-" + ConfigOption.REDUCES.getOpt());
+ args.add("2");
args.add("-" + ConfigOption.APPEND_SIZE.getOpt());
args.add("1M,2M");
args.add("-" + ConfigOption.BLOCK_SIZE.getOpt());
@@ -197,8 +199,7 @@ public class TestSlive {
rDelete(getImaginaryFile());
}
- // cleans up a file or directory
- // recursively if needbe
+ /** cleans up a file or directory recursively if need be */
private void rDelete(File place) throws Exception {
if (place.isFile()) {
LOG.info("Deleting file " + place);
@@ -208,7 +209,7 @@ public class TestSlive {
}
}
- // deletes a dir and its contents
+ /** deletes a dir and its contents */
private void deleteDir(File dir) throws Exception {
String fns[] = dir.list();
// delete contents first
@@ -226,7 +227,8 @@ public class TestSlive {
ConfigExtractor extractor = getTestConfig(true);
assertEquals(extractor.getOpCount().intValue(), Constants.OperationType
.values().length);
- assertEquals(extractor.getMapAmount().intValue(), 1);
+ assertEquals(extractor.getMapAmount().intValue(), 2);
+ assertEquals(extractor.getReducerAmount().intValue(), 2);
Range<Long> apRange = extractor.getAppendSize();
assertEquals(apRange.getLower().intValue(), Constants.MEGABYTES * 1);
assertEquals(apRange.getUpper().intValue(), Constants.MEGABYTES * 2);
@@ -402,8 +404,8 @@ public class TestSlive {
@Test
public void testMRFlow() throws Exception {
ConfigExtractor extractor = getTestConfig(false);
- SliveTest s = new SliveTest(getTestArgs(false), getBaseConfig());
- int ec = s.run();
+ SliveTest s = new SliveTest(getBaseConfig());
+ int ec = ToolRunner.run(s, getTestArgs(false));
assertTrue(ec == 0);
String resFile = extractor.getResultFile();
File fn = new File(resFile);
@@ -470,11 +472,14 @@ public class TestSlive {
// attempt to read it
DataVerifier vf = new DataVerifier();
VerifyOutput vout = new VerifyOutput(0, 0, 0, 0);
+ DataInputStream in = null;
try {
- vout = vf
- .verifyFile(byteAm, new DataInputStream(new FileInputStream(fn)));
+ in = new DataInputStream(new FileInputStream(fn));
+ vout = vf.verifyFile(byteAm, in);
} catch (Exception e) {
+ } finally {
+ if(in != null) in.close();
}
assertTrue(vout.getChunksSame() == 0);
}
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java?rev=959795&r1=959794&r2=959795&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/test/MapredTestDriver.java Thu Jul 1 22:17:37 2010
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.DFSCIOTest;
import org.apache.hadoop.fs.DistributedFSCheck;
import org.apache.hadoop.io.FileBench;
import org.apache.hadoop.fs.JHLogAnalyzer;
+import org.apache.hadoop.fs.slive.SliveTest;
/**
* Driver for Map-reduce tests.
@@ -86,7 +87,7 @@ public class MapredTestDriver {
"A benchmark that stresses the namenode.");
pgd.addClass("testfilesystem", TestFileSystem.class,
"A test for FileSystem read/write.");
- pgd.addClass("TestDFSIO", TestDFSIO.class,
+ pgd.addClass(TestDFSIO.class.getSimpleName(), TestDFSIO.class,
"Distributed i/o benchmark.");
pgd.addClass("DFSCIOTest", DFSCIOTest.class, "" +
"Distributed i/o benchmark of libhdfs.");
@@ -98,6 +99,8 @@ public class MapredTestDriver {
"Text(Input|Output)Format (compressed and uncompressed)");
pgd.addClass(JHLogAnalyzer.class.getSimpleName(), JHLogAnalyzer.class,
"Job History Log analyzer.");
+ pgd.addClass(SliveTest.class.getSimpleName(), SliveTest.class,
+ "HDFS Stress Test and Live Data Verification.");
} catch(Throwable e) {
e.printStackTrace();
}