You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC
svn commit: r885145 [27/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233:
./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/
src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Sat Nov 28 20:26:01 2009
@@ -23,17 +23,19 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.StringUtils;
/** An {@link OutputCommitter} that commits files specified
- * in job output directory i.e. ${mapred.output.dir}.
+ * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
**/
public class FileOutputCommitter extends OutputCommitter {
@@ -43,6 +45,9 @@
* Temporary directory name
*/
protected static final String TEMP_DIR_NAME = "_temporary";
+ public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+ static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+ "mapreduce.fileoutputcommitter.marksuccessfuljobs";
private FileSystem outputFileSystem = null;
private Path outputPath = null;
private Path workPath = null;
@@ -80,10 +85,36 @@
}
}
+ // True if the job requires output.dir marked on successful job.
+ // Note that by default it is set to true.
+ private boolean shouldMarkOutputDir(Configuration conf) {
+ return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
+ }
+
+ // Create a _success file in the job's output dir
+ private void markOutputDirSuccessful(JobContext context) throws IOException {
+ if (outputPath != null) {
+ // create a file in the output folder to mark the job completion
+ Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+ outputFileSystem.create(filePath).close();
+ }
+ }
+
/**
* Delete the temporary directory, including all of the work directories.
+ * Create a _SUCCESS file to make it as successful.
* @param context the job's context
*/
+ public void commitJob(JobContext context) throws IOException {
+ // delete the _temporary folder and create a _done file in the o/p folder
+ cleanupJob(context);
+ if (shouldMarkOutputDir(context.getConfiguration())) {
+ markOutputDirSuccessful(context);
+ }
+ }
+
+ @Override
+ @Deprecated
public void cleanupJob(JobContext context) throws IOException {
if (outputPath != null) {
Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
@@ -91,10 +122,23 @@
if (fileSys.exists(tmpDir)) {
fileSys.delete(tmpDir, true);
}
+ } else {
+ LOG.warn("Output Path is null in cleanup");
}
}
/**
+ * Delete the temporary directory, including all of the work directories.
+ * @param context the job's context
+ */
+ @Override
+ public void abortJob(JobContext context, JobStatus.State state)
+ throws IOException {
+ // delete the _temporary folder
+ cleanupJob(context);
+ }
+
+ /**
* No task setup required.
*/
@Override
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -25,7 +25,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
@@ -42,11 +42,18 @@
/** Construct output file names so that, when an output directory listing is
* sorted lexicographically, positions correspond to output partitions.*/
private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+ protected static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
+ protected static final String PART = "part";
static {
NUMBER_FORMAT.setMinimumIntegerDigits(5);
NUMBER_FORMAT.setGroupingUsed(false);
}
private FileOutputCommitter committer = null;
+public static final String COMPRESS ="mapreduce.output.fileoutputformat.compress";
+public static final String COMPRESS_CODEC =
+"mapreduce.output.fileoutputformat.compress.codec";
+public static final String COMPRESS_TYPE = "mapreduce.output.fileoutputformat.compress.type";
+public static final String OUTDIR = "mapreduce.output.fileoutputformat.outputdir";
/**
* Set whether the output of the job is compressed.
@@ -54,7 +61,7 @@
* @param compress should the output of the job be compressed?
*/
public static void setCompressOutput(Job job, boolean compress) {
- job.getConfiguration().setBoolean("mapred.output.compress", compress);
+ job.getConfiguration().setBoolean(FileOutputFormat.COMPRESS, compress);
}
/**
@@ -64,7 +71,8 @@
* <code>false</code> otherwise
*/
public static boolean getCompressOutput(JobContext job) {
- return job.getConfiguration().getBoolean("mapred.output.compress", false);
+ return job.getConfiguration().getBoolean(
+ FileOutputFormat.COMPRESS, false);
}
/**
@@ -77,7 +85,7 @@
setOutputCompressorClass(Job job,
Class<? extends CompressionCodec> codecClass) {
setCompressOutput(job, true);
- job.getConfiguration().setClass("mapred.output.compression.codec",
+ job.getConfiguration().setClass(FileOutputFormat.COMPRESS_CODEC,
codecClass,
CompressionCodec.class);
}
@@ -95,7 +103,7 @@
Class<? extends CompressionCodec> defaultValue) {
Class<? extends CompressionCodec> codecClass = defaultValue;
Configuration conf = job.getConfiguration();
- String name = conf.get("mapred.output.compression.codec");
+ String name = conf.get(FileOutputFormat.COMPRESS_CODEC);
if (name != null) {
try {
codecClass =
@@ -133,7 +141,7 @@
* the map-reduce job.
*/
public static void setOutputPath(Job job, Path outputDir) {
- job.getConfiguration().set("mapred.output.dir", outputDir.toString());
+ job.getConfiguration().set(FileOutputFormat.OUTDIR, outputDir.toString());
}
/**
@@ -143,7 +151,7 @@
* @see FileOutputFormat#getWorkOutputPath(TaskInputOutputContext)
*/
public static Path getOutputPath(JobContext job) {
- String name = job.getConfiguration().get("mapred.output.dir");
+ String name = job.getConfiguration().get(FileOutputFormat.OUTDIR);
return name == null ? null: new Path(name);
}
@@ -164,11 +172,11 @@
*
* <p>To get around this the Map-Reduce framework helps the application-writer
* out by maintaining a special
- * <tt>${mapred.output.dir}/_temporary/_${taskid}</tt>
+ * <tt>${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}</tt>
* sub-directory for each task-attempt on HDFS where the output of the
* task-attempt goes. On successful completion of the task-attempt the files
- * in the <tt>${mapred.output.dir}/_temporary/_${taskid}</tt> (only)
- * are <i>promoted</i> to <tt>${mapred.output.dir}</tt>. Of course, the
+ * in the <tt>${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}</tt> (only)
+ * are <i>promoted</i> to <tt>${mapreduce.output.fileoutputformat.outputdir}</tt>. Of course, the
* framework discards the sub-directory of unsuccessful task-attempts. This
* is completely transparent to the application.</p>
*
@@ -253,8 +261,22 @@
String extension) throws IOException{
FileOutputCommitter committer =
(FileOutputCommitter) getOutputCommitter(context);
- return new Path(committer.getWorkPath(), getUniqueFile(context, "part",
- extension));
+ return new Path(committer.getWorkPath(), getUniqueFile(context,
+ getOutputName(context), extension));
+ }
+
+ /**
+ * Get the base output name for the output file.
+ */
+ protected static String getOutputName(JobContext job) {
+ return job.getConfiguration().get(BASE_OUTPUT_NAME, PART);
+ }
+
+ /**
+ * Set the base output name for output file to be created.
+ */
+ protected static void setOutputName(JobContext job, String name) {
+ job.getConfiguration().set(BASE_OUTPUT_NAME, name);
}
public synchronized
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/LazyOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -33,6 +33,8 @@
* A Convenience class that creates output lazily.
*/
public class LazyOutputFormat <K,V> extends FilterOutputFormat<K, V> {
+ public static String OUTPUT_FORMAT =
+ "mapreduce.output.lazyoutputformat.outputformat";
/**
* Set the underlying output format for LazyOutputFormat.
* @param job the {@link Job} to modify
@@ -42,7 +44,7 @@
public static void setOutputFormatClass(Job job,
Class<? extends OutputFormat> theClass) {
job.setOutputFormatClass(LazyOutputFormat.class);
- job.getConfiguration().setClass("mapred.lazy.output.format",
+ job.getConfiguration().setClass(OUTPUT_FORMAT,
theClass, OutputFormat.class);
}
@@ -50,7 +52,7 @@
private void getBaseOutputFormat(Configuration conf)
throws IOException {
baseOut = ((OutputFormat<K, V>) ReflectionUtils.newInstance(
- conf.getClass("mapred.lazy.output.format", null), conf));
+ conf.getClass(OUTPUT_FORMAT, null), conf));
if (baseOut == null) {
throw new IOException("Output Format not set for LazyOutputFormat");
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileAsBinaryOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -38,6 +38,8 @@
*/
public class SequenceFileAsBinaryOutputFormat
extends SequenceFileOutputFormat <BytesWritable,BytesWritable> {
+ public static String KEY_CLASS = "mapreduce.output.seqbinaryoutputformat.key.class";
+ public static String VALUE_CLASS = "mapreduce.output.seqbinaryoutputformat.value.class";
/**
* Inner class used for appendRaw
@@ -83,7 +85,7 @@
*/
static public void setSequenceFileOutputKeyClass(Job job,
Class<?> theClass) {
- job.getConfiguration().setClass("mapred.seqbinary.output.key.class",
+ job.getConfiguration().setClass(KEY_CLASS,
theClass, Object.class);
}
@@ -97,7 +99,7 @@
*/
static public void setSequenceFileOutputValueClass(Job job,
Class<?> theClass) {
- job.getConfiguration().setClass("mapred.seqbinary.output.value.class",
+ job.getConfiguration().setClass(VALUE_CLASS,
theClass, Object.class);
}
@@ -108,7 +110,7 @@
*/
static public Class<? extends WritableComparable>
getSequenceFileOutputKeyClass(JobContext job) {
- return job.getConfiguration().getClass("mapred.seqbinary.output.key.class",
+ return job.getConfiguration().getClass(KEY_CLASS,
job.getOutputKeyClass().asSubclass(WritableComparable.class),
WritableComparable.class);
}
@@ -120,8 +122,7 @@
*/
static public Class<? extends Writable> getSequenceFileOutputValueClass(
JobContext job) {
- return job.getConfiguration().getClass(
- "mapred.seqbinary.output.value.class",
+ return job.getConfiguration().getClass(VALUE_CLASS,
job.getOutputValueClass().asSubclass(Writable.class), Writable.class);
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -92,7 +92,7 @@
* defaulting to {@link CompressionType#RECORD}
*/
public static CompressionType getOutputCompressionType(JobContext job) {
- String val = job.getConfiguration().get("mapred.output.compression.type",
+ String val = job.getConfiguration().get(FileOutputFormat.COMPRESS_TYPE,
CompressionType.RECORD.toString());
return CompressionType.valueOf(val);
}
@@ -106,7 +106,7 @@
public static void setOutputCompressionType(Job job,
CompressionType style) {
setCompressOutput(job, true);
- job.getConfiguration().set("mapred.output.compression.type",
+ job.getConfiguration().set(FileOutputFormat.COMPRESS_TYPE,
style.toString());
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -38,6 +38,7 @@
/** An {@link OutputFormat} that writes plain text files. */
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
+ public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
protected static class LineRecordWriter<K, V>
extends RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
@@ -112,8 +113,7 @@
) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
- String keyValueSeparator= conf.get("mapred.textoutputformat.separator",
- "\t");
+ String keyValueSeparator= conf.get(SEPERATOR, "\t");
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/BinaryPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/BinaryPartitioner.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/BinaryPartitioner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/BinaryPartitioner.java Sat Nov 28 20:26:01 2009
@@ -32,11 +32,11 @@
* of the following properties:
* <ul>
* <li>
- * <i>mapred.binary.partitioner.left.offset</i>:
+ * <i>mapreduce.partition.binarypartitioner.left.offset</i>:
* left offset in array (0 by default)
* </li>
* <li>
- * <i>mapred.binary.partitioner.right.offset</i>:
+ * <i>mapreduce.partition.binarypartitioner.right.offset</i>:
* right offset in array (-1 by default)
* </li>
* </ul>
@@ -67,10 +67,10 @@
public class BinaryPartitioner<V> extends Partitioner<BinaryComparable, V>
implements Configurable {
- private static final String LEFT_OFFSET_PROPERTY_NAME =
- "mapred.binary.partitioner.left.offset";
- private static final String RIGHT_OFFSET_PROPERTY_NAME =
- "mapred.binary.partitioner.right.offset";
+ public static final String LEFT_OFFSET_PROPERTY_NAME =
+ "mapreduce.partition.binarypartitioner.left.offset";
+ public static final String RIGHT_OFFSET_PROPERTY_NAME =
+ "mapreduce.partition.binarypartitioner.right.offset";
/**
* Set the subarray to be used for partitioning to
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java Sat Nov 28 20:26:01 2009
@@ -42,6 +42,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -131,7 +132,8 @@
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.createRecordReader(
splits.get(i * splitStep),
- new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
+ new TaskAttemptContextImpl(job.getConfiguration(),
+ new TaskAttemptID()));
while (reader.nextKeyValue()) {
samples.add(reader.getCurrentKey());
++records;
@@ -209,7 +211,8 @@
for (int i = 0; i < splitsToSample ||
(i < splits.size() && samples.size() < numSamples); ++i) {
RecordReader<K,V> reader = inf.createRecordReader(splits.get(i),
- new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
+ new TaskAttemptContextImpl(job.getConfiguration(),
+ new TaskAttemptID()));
while (reader.nextKeyValue()) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
@@ -277,7 +280,8 @@
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.createRecordReader(
splits.get(i * splitStep),
- new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID()));
+ new TaskAttemptContextImpl(job.getConfiguration(),
+ new TaskAttemptID()));
while (reader.nextKeyValue()) {
++records;
if ((double) kept / records < freq) {
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.java Sat Nov 28 20:26:01 2009
@@ -25,6 +25,8 @@
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.partition.KeyFieldHelper.KeyDescription;
@@ -41,12 +43,13 @@
* of the field); if omitted from pos2, it defaults to 0 (the end of the
* field). opts are ordering options (any of 'nr' as described above).
* We assume that the fields in the key are separated by
- * map.output.key.field.separator.
+ * {@link JobContext#MAP_OUTPUT_KEY_FIELD_SEPERATOR}.
*/
public class KeyFieldBasedComparator<K, V> extends WritableComparator
implements Configurable {
private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
+ public static String COMPARATOR_OPTIONS = "mapreduce.partition.keycomparator.options";
private static final byte NEGATIVE = (byte)'-';
private static final byte ZERO = (byte)'0';
private static final byte DECIMAL = (byte)'.';
@@ -54,8 +57,8 @@
public void setConf(Configuration conf) {
this.conf = conf;
- String option = conf.get("mapred.text.key.comparator.options");
- String keyFieldSeparator = conf.get("map.output.key.field.separator","\t");
+ String option = conf.get(COMPARATOR_OPTIONS);
+ String keyFieldSeparator = conf.get(JobContext.MAP_OUTPUT_KEY_FIELD_SEPERATOR,"\t");
keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
keyFieldHelper.parseOption(option);
}
@@ -338,4 +341,31 @@
}
return true;
}
+ /**
+ * Set the {@link KeyFieldBasedComparator} options used to compare keys.
+ *
+ * @param keySpec the key specification of the form -k pos1[,pos2], where,
+ * pos is of the form f[.c][opts], where f is the number
+ * of the key field to use, and c is the number of the first character from
+ * the beginning of the field. Fields and character posns are numbered
+ * starting with 1; a character position of zero in pos2 indicates the
+ * field's last character. If '.c' is omitted from pos1, it defaults to 1
+ * (the beginning of the field); if omitted from pos2, it defaults to 0
+ * (the end of the field). opts are ordering options. The supported options
+ * are:
+ * -n, (Sort numerically)
+ * -r, (Reverse the result of comparison)
+ */
+ public static void setKeyFieldComparatorOptions(Job job, String keySpec) {
+ job.getConfiguration().set(COMPARATOR_OPTIONS, keySpec);
+ }
+
+ /**
+ * Get the {@link KeyFieldBasedComparator} options
+ */
+ public static String getKeyFieldComparatorOption(JobContext job) {
+ return job.getConfiguration().get(COMPARATOR_OPTIONS);
+ }
+
+
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java Sat Nov 28 20:26:01 2009
@@ -25,6 +25,8 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.partition.KeyFieldHelper.KeyDescription;
@@ -46,6 +48,8 @@
private static final Log LOG = LogFactory.getLog(
KeyFieldBasedPartitioner.class.getName());
+ public static String PARTITIONER_OPTIONS =
+ "mapreduce.partition.keypartitioner.options";
private int numOfPartitionFields;
private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
@@ -55,15 +59,15 @@
public void setConf(Configuration conf) {
this.conf = conf;
String keyFieldSeparator =
- conf.get("map.output.key.field.separator", "\t");
+ conf.get(JobContext.MAP_OUTPUT_KEY_FIELD_SEPERATOR, "\t");
keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
if (conf.get("num.key.fields.for.partition") != null) {
LOG.warn("Using deprecated num.key.fields.for.partition. " +
- "Use mapred.text.key.partitioner.options instead");
+ "Use mapreduce.partition.keypartitioner.options instead");
this.numOfPartitionFields = conf.getInt("num.key.fields.for.partition",0);
keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
} else {
- String option = conf.get("mapred.text.key.partitioner.options");
+ String option = conf.get(PARTITIONER_OPTIONS);
keyFieldHelper.parseOption(option);
}
}
@@ -119,4 +123,30 @@
protected int getPartition(int hash, int numReduceTasks) {
return (hash & Integer.MAX_VALUE) % numReduceTasks;
}
+
+ /**
+ * Set the {@link KeyFieldBasedPartitioner} options used for
+ * {@link Partitioner}
+ *
+ * @param keySpec the key specification of the form -k pos1[,pos2], where,
+ * pos is of the form f[.c][opts], where f is the number
+ * of the key field to use, and c is the number of the first character from
+ * the beginning of the field. Fields and character posns are numbered
+ * starting with 1; a character position of zero in pos2 indicates the
+ * field's last character. If '.c' is omitted from pos1, it defaults to 1
+ * (the beginning of the field); if omitted from pos2, it defaults to 0
+ * (the end of the field).
+ */
+ public void setKeyFieldPartitionerOptions(Job job, String keySpec) {
+ job.getConfiguration().set(PARTITIONER_OPTIONS, keySpec);
+ }
+
+ /**
+ * Get the {@link KeyFieldBasedPartitioner} options
+ */
+ public String getKeyFieldPartitionerOption(JobContext job) {
+ return job.getConfiguration().get(PARTITIONER_OPTIONS);
+ }
+
+
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/lib/partition/TotalOrderPartitioner.java Sat Nov 28 20:26:01 2009
@@ -45,6 +45,12 @@
private Node partitions;
public static final String DEFAULT_PATH = "_partition.lst";
+ public static final String PARTITIONER_PATH =
+ "mapreduce.totalorderpartitioner.path";
+ public static final String MAX_TRIE_DEPTH =
+ "mapreduce.totalorderpartitioner.trie.maxdepth";
+ public static final String NATURAL_ORDER =
+ "mapreduce.totalorderpartitioner.naturalorder";
Configuration conf;
public TotalOrderPartitioner() { }
@@ -83,7 +89,7 @@
}
}
boolean natOrder =
- conf.getBoolean("total.order.partitioner.natural.order", true);
+ conf.getBoolean(NATURAL_ORDER, true);
if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
partitions = buildTrie((BinaryComparable[])splitPoints, 0,
splitPoints.length, new byte[0],
@@ -94,7 +100,7 @@
// case where the split points are long and mostly look like bytes
// iii...iixii...iii . Therefore, we make the default depth
// limit large but not huge.
- conf.getInt("total.order.partitioner.max.trie.depth", 200));
+ conf.getInt(MAX_TRIE_DEPTH, 200));
} else {
partitions = new BinarySearchNode(splitPoints, comparator);
}
@@ -119,7 +125,7 @@
* keys in the SequenceFile.
*/
public static void setPartitionFile(Configuration conf, Path p) {
- conf.set("total.order.partitioner.path", p.toString());
+ conf.set(PARTITIONER_PATH, p.toString());
}
/**
@@ -127,7 +133,7 @@
* @see #setPartitionFile(Configuration, Path)
*/
public static String getPartitionFile(Configuration conf) {
- return conf.get("total.order.partitioner.path", DEFAULT_PATH);
+ return conf.get(PARTITIONER_PATH, DEFAULT_PATH);
}
/**
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/overview.html
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/overview.html?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/overview.html (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/overview.html Sat Nov 28 20:26:01 2009
@@ -156,7 +156,7 @@
<li>The {@link org.apache.hadoop.mapred.JobTracker} (MapReduce master)
host and port. This is specified with the configuration property
<tt><a
-href="../mapred-default.html#mapred.job.tracker">mapred.job.tracker</a></tt>.
+href="../mapred-default.html#mapreduce.jobtracker.address">mapreduce.jobtracker.address</a></tt>.
</li>
<li>A <em>slaves</em> file that lists the names of all the hosts in
@@ -194,7 +194,7 @@
<xmp><configuration>
<property>
- <name>mapred.job.tracker</name>
+ <name>mapreduce.jobtracker.address</name>
<value>localhost:9001</value>
</property>
@@ -255,7 +255,7 @@
as <tt><em>hdfs://master.example.com/</em></tt> in <tt>conf/core-site.xml</tt>.</li>
<li>The host and port of the your master server in the value
-of <tt><a href="../mapred-default.html#mapred.job.tracker">mapred.job.tracker</a></tt>
+of <tt><a href="../mapred-default.html#mapreduce.jobtracker.address">mapreduce.jobtracker.address</a></tt>
as <tt><em>master.example.com</em>:<em>port</em></tt> in <tt>conf/mapred-site.xml</tt>.</li>
<li>Directories for <tt><a
@@ -268,18 +268,18 @@
list of directory names, so that data may be stored on multiple local
devices.</li>
-<li><tt><a href="../mapred-default.html#mapred.local.dir">mapred.local.dir</a></tt>
+<li><tt><a href="../mapred-default.html#mapreduce.cluster.local.dir">mapreduce.cluster.local.dir</a></tt>
in <tt>conf/mapred-site.xml</tt>, the local directory where temporary
MapReduce data is stored. It also may be a list of directories.</li>
<li><tt><a
-href="../mapred-default.html#mapred.map.tasks">mapred.map.tasks</a></tt>
+href="../mapred-default.html#mapreduce.job.maps">mapreduce.job.maps</a></tt>
and <tt><a
-href="../mapred-default.html#mapred.reduce.tasks">mapred.reduce.tasks</a></tt>
+href="../mapred-default.html#mapreduce.job.reduces">mapreduce.job.reduces</a></tt>
in <tt>conf/mapred-site.xml</tt>.
As a rule of thumb, use 10x the
-number of slave processors for <tt>mapred.map.tasks</tt>, and 2x the
-number of slave processors for <tt>mapred.reduce.tasks</tt>.</li>
+number of slave processors for <tt>mapreduce.job.maps</tt>, and 2x the
+number of slave processors for <tt>mapreduce.job.reduces</tt>.</li>
</ol>
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/commit-tests
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/commit-tests?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/commit-tests (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/commit-tests Sat Nov 28 20:26:01 2009
@@ -38,4 +38,8 @@
**/TestTaskTrackerBlacklisting.java
**/TestTaskTrackerLocalization
**/TestTrackerDistributedCacheManager
-
+**/TestQueueManager
+**/TestContainerQueue
+**/TestCapacityScheduler
+**/TestRefreshOfQueues
+**/TestQueueManagerRefresh
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/findbugsExcludeFile.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/findbugsExcludeFile.xml Sat Nov 28 20:26:01 2009
@@ -16,6 +16,10 @@
<Bug pattern="DLS_DEAD_LOCAL_STORE" />
</Match>
<Match>
+ <Class name="~.*_jspx" />
+ <Bug pattern="DLS_DEAD_LOCAL_STORE" />
+ </Match>
+ <Match>
<Field name="_jspx_dependants" />
<Bug pattern="UWF_UNWRITTEN_FIELD" />
</Match>
@@ -49,6 +53,10 @@
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
</Match>
<Match>
+ <Class name="~org.apache.hadoop.mapred.*" />
+ <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
+ </Match>
+ <Match>
<Class name="~org.apache.hadoop.mapred.lib.aggregate.*" />
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
</Match>
@@ -60,6 +68,14 @@
<Class name="org.apache.hadoop.mapred.SequenceFileInputFilter$Filter" />
<Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
</Match>
+ <Match>
+ <Class name="~org.apache.hadoop.util.*" />
+ <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
+ </Match>
+ <Match>
+ <Class name="~org.apache.hadoop.filecache.*" />
+ <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
+ </Match>
<!--
Ignore warnings for usage of System.exit. This is
required and have been well thought out
@@ -112,6 +128,20 @@
<Class name="org.apache.hadoop.mapred.FileOutputCommitter" />
<Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.mapred.OutputCommitter" />
+ <Or>
+ <Method name="abortJob" />
+ <Method name="commitJob" />
+ <Method name="cleanupJob" />
+ </Or>
+ <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
+ </Match>
+ <Match>
+ <Class name="org.apache.hadoop.mapred.lib.db.DBInputFormat$DBRecordReader" />
+ <Method name="next" />
+ <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
+ </Match>
<!--
Ignoring this warning as resolving this would need a non-trivial change in code
-->
@@ -154,6 +184,39 @@
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.mapreduce.task.reduce.MergeThread" />
+ <Field name="inputs" />
+ <Bug pattern="IS2_INCONSISTENT_SYNC" />
+ </Match>
+ <Match>
+ <Class name="org.apache.hadoop.mapred.JobTracker" />
+ <Method name="updateTaskTrackerStatus" />
+ <Bug pattern="DLS_DEAD_LOCAL_STORE" />
+ </Match>
+
+ <!--
+ This class is unlikely to get subclassed, so ignore
+ -->
+ <Match>
+ <Class name="org.apache.hadoop.mapreduce.task.reduce.MergeManager" />
+ <Bug pattern="SC_START_IN_CTOR" />
+ </Match>
+
+ <!--
+ Do not bother if equals is not implemented. We will not need it here
+ -->
+ <Match>
+ <Class name="org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler$Penalty" />
+ <Bug pattern="EQ_COMPARETO_USE_OBJECT_EQUALS" />
+ </Match>
+
+ <Match>
+ <Class name="org.apache.hadoop.mapred.Task" />
+ <Method name="reportFatalError" />
+ <Bug pattern="DM_EXIT" />
+ </Match>
+
<!--
core changes
-->
@@ -230,4 +293,23 @@
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.mapred.TaskScheduler$QueueRefresher" />
+ <Bug pattern="SIC_INNER_SHOULD_BE_STATIC" />
+ </Match>
+
+ <Match>
+ <Class name="org.apache.hadoop.examples.terasort.TeraInputFormat$1" />
+ <Method name="run" />
+ <Bug pattern="DM_EXIT" />
+ </Match>
+ <Match>
+ <Class name="org.apache.hadoop.examples.terasort.TeraOutputFormat$TeraOutputCommitter"/>
+ <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
+ </Match>
+ <Match>
+ <Class name="org.apache.hadoop.examples.terasort.Unsigned16" />
+ <Method name="getHexDigit"/>
+ <Bug pattern="ICAST_QUESTIONABLE_UNSIGNED_RIGHT_SHIFT" />
+ </Match>
</FindBugsFilter>
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/test/mapred:713112
/hadoop/core/trunk/src/test/mapred:776175-785643
-/hadoop/mapreduce/trunk/src/test/mapred:804974-807678
+/hadoop/mapreduce/trunk/src/test/mapred:804974-884916
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred-site.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred-site.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred-site.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred-site.xml Sat Nov 28 20:26:01 2009
@@ -6,16 +6,16 @@
<configuration>
<property>
- <name>io.sort.mb</name>
+ <name>mapreduce.task.io.sort.mb</name>
<value>10</value>
</property>
<property>
- <name>mapred.hosts.exclude</name>
+ <name>mapreduce.jobtracker.hosts.exclude.filename</name>
<value>hosts.exclude</value>
<description></description>
</property>
<property>
- <name>mapred.job.tracker.retire.jobs</name>
+ <name>mapreduce.jobtracker.retirejobs</name>
<value>false</value>
<description></description>
</property>
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/cli/TestMRCLI.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/cli/TestMRCLI.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/cli/TestMRCLI.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/cli/TestMRCLI.java Sat Nov 28 20:26:01 2009
@@ -25,6 +25,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.tools.MRAdmin;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.security.authorize.HadoopPolicyProvider;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.util.ToolRunner;
@@ -43,7 +44,7 @@
JobConf mrConf = new JobConf(conf);
mrCluster = new MiniMRCluster(1, dfsCluster.getFileSystem().getUri().toString(), 1,
null, null, mrConf);
- jobtracker = mrCluster.createJobConf().get("mapred.job.tracker", "local");
+ jobtracker = mrCluster.createJobConf().get(JTConfig.JT_IPC_ADDRESS, "local");
cmdExecutor = new MRCmdExecutor(jobtracker);
archiveCmdExecutor = new ArchiveCmdExecutor(namenode, mrConf);
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/conf/TestJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/conf/TestJobConf.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/conf/TestJobConf.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/conf/TestJobConf.java Sat Nov 28 20:26:01 2009
@@ -17,17 +17,19 @@
*/
package org.apache.hadoop.conf;
-import junit.framework.Assert;
-import junit.framework.TestCase;
+import org.junit.Assert;
+import org.junit.Test;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
-public class TestJobConf extends TestCase {
+public class TestJobConf {
+ @Test
public void testProfileParamsDefaults() {
JobConf configuration = new JobConf();
- Assert.assertNull(configuration.get("mapred.task.profile.params"));
+ Assert.assertNull(configuration.get(JobContext.TASK_PROFILE_PARAMS));
String result = configuration.getProfileParams();
@@ -36,17 +38,19 @@
Assert.assertTrue(result.startsWith("-agentlib:hprof"));
}
+ @Test
public void testProfileParamsSetter() {
JobConf configuration = new JobConf();
configuration.setProfileParams("test");
- Assert.assertEquals("test", configuration.get("mapred.task.profile.params"));
+ Assert.assertEquals("test", configuration.get(JobContext.TASK_PROFILE_PARAMS));
}
+ @Test
public void testProfileParamsGetter() {
JobConf configuration = new JobConf();
- configuration.set("mapred.task.profile.params", "test");
+ configuration.set(JobContext.TASK_PROFILE_PARAMS, "test");
Assert.assertEquals("test", configuration.getProfileParams());
}
@@ -54,67 +58,108 @@
* Testing mapred.task.maxvmem replacement with new values
*
*/
+ @Test
public void testMemoryConfigForMapOrReduceTask(){
JobConf configuration = new JobConf();
- configuration.set("mapred.job.map.memory.mb",String.valueOf(300));
- configuration.set("mapred.job.reduce.memory.mb",String.valueOf(300));
+ configuration.set(JobContext.MAP_MEMORY_MB,String.valueOf(300));
+ configuration.set(JobContext.REDUCE_MEMORY_MB,String.valueOf(300));
Assert.assertEquals(configuration.getMemoryForMapTask(),300);
Assert.assertEquals(configuration.getMemoryForReduceTask(),300);
configuration.set("mapred.task.maxvmem" , String.valueOf(2*1024 * 1024));
- configuration.set("mapred.job.map.memory.mb",String.valueOf(300));
- configuration.set("mapred.job.reduce.memory.mb",String.valueOf(300));
+ configuration.set(JobContext.MAP_MEMORY_MB,String.valueOf(300));
+ configuration.set(JobContext.REDUCE_MEMORY_MB,String.valueOf(300));
Assert.assertEquals(configuration.getMemoryForMapTask(),2);
Assert.assertEquals(configuration.getMemoryForReduceTask(),2);
configuration = new JobConf();
configuration.set("mapred.task.maxvmem" , "-1");
- configuration.set("mapred.job.map.memory.mb",String.valueOf(300));
- configuration.set("mapred.job.reduce.memory.mb",String.valueOf(300));
- Assert.assertEquals(configuration.getMemoryForMapTask(),-1);
- Assert.assertEquals(configuration.getMemoryForReduceTask(),-1);
+ configuration.set(JobContext.MAP_MEMORY_MB,String.valueOf(300));
+ configuration.set(JobContext.REDUCE_MEMORY_MB,String.valueOf(400));
+ Assert.assertEquals(configuration.getMemoryForMapTask(), 300);
+ Assert.assertEquals(configuration.getMemoryForReduceTask(), 400);
configuration = new JobConf();
configuration.set("mapred.task.maxvmem" , String.valueOf(2*1024 * 1024));
- configuration.set("mapred.job.map.memory.mb","-1");
- configuration.set("mapred.job.reduce.memory.mb","-1");
+ configuration.set(JobContext.MAP_MEMORY_MB,"-1");
+ configuration.set(JobContext.REDUCE_MEMORY_MB,"-1");
Assert.assertEquals(configuration.getMemoryForMapTask(),2);
Assert.assertEquals(configuration.getMemoryForReduceTask(),2);
configuration = new JobConf();
configuration.set("mapred.task.maxvmem" , String.valueOf(-1));
- configuration.set("mapred.job.map.memory.mb","-1");
- configuration.set("mapred.job.reduce.memory.mb","-1");
+ configuration.set(JobContext.MAP_MEMORY_MB,"-1");
+ configuration.set(JobContext.REDUCE_MEMORY_MB,"-1");
Assert.assertEquals(configuration.getMemoryForMapTask(),-1);
Assert.assertEquals(configuration.getMemoryForReduceTask(),-1);
configuration = new JobConf();
configuration.set("mapred.task.maxvmem" , String.valueOf(2*1024 * 1024));
+ configuration.set(JobContext.MAP_MEMORY_MB, "3");
+ configuration.set(JobContext.REDUCE_MEMORY_MB, "3");
Assert.assertEquals(configuration.getMemoryForMapTask(),2);
Assert.assertEquals(configuration.getMemoryForReduceTask(),2);
+
}
/**
+ * Test that negative values for MAPRED_TASK_MAXVMEM_PROPERTY cause
+ * new configuration keys' values to be used.
+ */
+ @Test
+ public void testNegativeValueForTaskVmem() {
+ JobConf configuration = new JobConf();
+
+ configuration.set(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, "-3");
+ configuration.set(JobContext.MAP_MEMORY_MB, "4");
+ configuration.set(JobContext.REDUCE_MEMORY_MB, "5");
+ Assert.assertEquals(4, configuration.getMemoryForMapTask());
+ Assert.assertEquals(5, configuration.getMemoryForReduceTask());
+
+ }
+
+ /**
+ * Test that negative values for all memory configuration properties causes
+ * APIs to disable memory limits
+ */
+ @Test
+ public void testNegativeValuesForMemoryParams() {
+ JobConf configuration = new JobConf();
+
+ configuration.set(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, "-4");
+ configuration.set(JobContext.MAP_MEMORY_MB, "-5");
+ configuration.set(JobContext.REDUCE_MEMORY_MB, "-6");
+
+ Assert.assertEquals(JobConf.DISABLED_MEMORY_LIMIT,
+ configuration.getMemoryForMapTask());
+ Assert.assertEquals(JobConf.DISABLED_MEMORY_LIMIT,
+ configuration.getMemoryForReduceTask());
+ Assert.assertEquals(JobConf.DISABLED_MEMORY_LIMIT,
+ configuration.getMaxVirtualMemoryForTask());
+ }
+
+ /**
* Test deprecated accessor and mutator method for mapred.task.maxvmem
*/
+ @Test
public void testMaxVirtualMemoryForTask() {
JobConf configuration = new JobConf();
//get test case
- configuration.set("mapred.job.map.memory.mb", String.valueOf(300));
- configuration.set("mapred.job.reduce.memory.mb", String.valueOf(-1));
+ configuration.set(JobContext.MAP_MEMORY_MB, String.valueOf(300));
+ configuration.set(JobContext.REDUCE_MEMORY_MB, String.valueOf(-1));
Assert.assertEquals(
configuration.getMaxVirtualMemoryForTask(), 300 * 1024 * 1024);
configuration = new JobConf();
- configuration.set("mapred.job.map.memory.mb", String.valueOf(-1));
- configuration.set("mapred.job.reduce.memory.mb", String.valueOf(200));
+ configuration.set(JobContext.MAP_MEMORY_MB, String.valueOf(-1));
+ configuration.set(JobContext.REDUCE_MEMORY_MB, String.valueOf(200));
Assert.assertEquals(
configuration.getMaxVirtualMemoryForTask(), 200 * 1024 * 1024);
configuration = new JobConf();
- configuration.set("mapred.job.map.memory.mb", String.valueOf(-1));
- configuration.set("mapred.job.reduce.memory.mb", String.valueOf(-1));
+ configuration.set(JobContext.MAP_MEMORY_MB, String.valueOf(-1));
+ configuration.set(JobContext.REDUCE_MEMORY_MB, String.valueOf(-1));
configuration.set("mapred.task.maxvmem", String.valueOf(1 * 1024 * 1024));
Assert.assertEquals(
configuration.getMaxVirtualMemoryForTask(), 1 * 1024 * 1024);
@@ -132,10 +177,12 @@
Assert.assertEquals(configuration.getMemoryForReduceTask(), 2);
configuration = new JobConf();
- configuration.set("mapred.job.map.memory.mb", String.valueOf(300));
- configuration.set("mapred.job.reduce.memory.mb", String.valueOf(400));
+ configuration.set(JobContext.MAP_MEMORY_MB, String.valueOf(300));
+ configuration.set(JobContext.REDUCE_MEMORY_MB, String.valueOf(400));
configuration.setMaxVirtualMemoryForTask(2 * 1024 * 1024);
Assert.assertEquals(configuration.getMemoryForMapTask(), 2);
Assert.assertEquals(configuration.getMemoryForReduceTask(), 2);
+
+
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/conf/TestNoDefaultsJobConf.java Sat Nov 28 20:26:01 2009
@@ -20,6 +20,7 @@
import junit.framework.Assert;
import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.io.LongWritable;
@@ -58,7 +59,7 @@
JobConf conf = new JobConf(false);
//seeding JT and NN info into non-defaults (empty jobconf)
- conf.set("mapred.job.tracker", createJobConf().get("mapred.job.tracker"));
+ conf.set(JTConfig.JT_IPC_ADDRESS, createJobConf().get(JTConfig.JT_IPC_ADDRESS));
conf.set("fs.default.name", createJobConf().get("fs.default.name"));
conf.setJobName("mr");
@@ -83,7 +84,7 @@
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(outDir,
- new OutputLogFilter()));
+ new Utils.OutputFileUtils.OutputFilesFilter()));
if (outputFiles.length > 0) {
InputStream is = getFileSystem().open(outputFiles[0]);
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/fs/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -0,0 +1,3 @@
+/hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/fs:713112
+/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/fs:776175-785643
+/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs:807679-884916
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -0,0 +1,3 @@
+/hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/hdfs:713112
+/hadoop/core/trunk/src/test/mapred/org/apache/hadoop/hdfs:776175-785643
+/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/hdfs:807679-884916
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/ipc/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -0,0 +1,4 @@
+/hadoop/core/branches/branch-0.19/hdfs/src/test/hdfs-with-mr/org/apache/hadoop/ipc:713112
+/hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/ipc:776175-784663
+/hadoop/hdfs/branches/HDFS-265/src/test/hdfs-with-mr/org/apache/hadoop/ipc:796829-820463
+/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/ipc:807679-884916
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/BigMapOutput.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/BigMapOutput.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/BigMapOutput.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/BigMapOutput.java Sat Nov 28 20:26:01 2009
@@ -42,7 +42,11 @@
public static final Log LOG =
LogFactory.getLog(BigMapOutput.class.getName());
private static Random random = new Random();
-
+ public static String MIN_KEY = "mapreduce.bmo.minkey";
+ public static String MIN_VALUE = "mapreduce.bmo.minvalue";
+ public static String MAX_KEY = "mapreduce.bmo.maxkey";
+ public static String MAX_VALUE = "mapreduce.bmo.maxvalue";
+
private static void randomizeBytes(byte[] data, int offset, int length) {
for(int i=offset + length - 1; i >= offset; --i) {
data[i] = (byte) random.nextInt(256);
@@ -66,12 +70,12 @@
BytesWritable.class, BytesWritable.class,
CompressionType.NONE);
long numBytesToWrite = fileSizeInMB * 1024 * 1024;
- int minKeySize = conf.getInt("test.bmo.min_key", 10);;
+ int minKeySize = conf.getInt(MIN_KEY, 10);;
int keySizeRange =
- conf.getInt("test.bmo.max_key", 1000) - minKeySize;
- int minValueSize = conf.getInt("test.bmo.min_value", 0);
+ conf.getInt(MAX_KEY, 1000) - minKeySize;
+ int minValueSize = conf.getInt(MIN_VALUE, 0);
int valueSizeRange =
- conf.getInt("test.bmo.max_value", 20000) - minValueSize;
+ conf.getInt(MAX_VALUE, 20000) - minValueSize;
BytesWritable randomKey = new BytesWritable();
BytesWritable randomValue = new BytesWritable();
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Sat Nov 28 20:26:01 2009
@@ -29,6 +29,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
@@ -96,7 +98,7 @@
throws IOException {
JobConf conf = new JobConf();
dfsCluster = new MiniDFSCluster(conf, NUMBER_OF_NODES, true, null);
- conf.set("mapred.task.tracker.task-controller",
+ conf.set(TTConfig.TT_TASK_CONTROLLER,
MyLinuxTaskController.class.getName());
mrCluster =
new MiniMRCluster(NUMBER_OF_NODES, dfsCluster.getFileSystem().getUri()
@@ -163,7 +165,7 @@
sb.append(",");
}
}
- writer.println(String.format("mapred.local.dir=%s", sb.toString()));
+ writer.println(String.format(MRConfig.LOCAL_DIR + "=%s", sb.toString()));
writer
.println(String.format("hadoop.log.dir=%s", TaskLog.getBaseLogDir()));
@@ -255,7 +257,9 @@
*/
protected void assertOwnerShip(Path outDir, FileSystem fs)
throws IOException {
- for (FileStatus status : fs.listStatus(outDir, new OutputLogFilter())) {
+ for (FileStatus status : fs.listStatus(outDir,
+ new Utils.OutputFileUtils
+ .OutputFilesFilter())) {
String owner = status.getOwner();
String group = status.getGroup();
LOG.info("Ownership of the file is " + status.getPath() + " is " + owner
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/ControlledMapReduceJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/ControlledMapReduceJob.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/ControlledMapReduceJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/ControlledMapReduceJob.java Sat Nov 28 20:26:01 2009
@@ -263,7 +263,7 @@
signalFileDir = new Path(conf.get("signal.dir.path"));
numReducers = conf.getNumReduceTasks();
fs = FileSystem.get(conf);
- String taskAttemptId = conf.get("mapred.task.id");
+ String taskAttemptId = conf.get(JobContext.TASK_ATTEMPT_ID);
if (taskAttemptId != null) {
TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptId);
taskNumber = taskAttemptID.getTaskID().getId();
@@ -421,7 +421,7 @@
// Set the following for reduce tasks to be able to be started running
// immediately along with maps.
- conf.set("mapred.reduce.slowstart.completed.maps", String.valueOf(0));
+ conf.set(JobContext.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, String.valueOf(0));
return conf;
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java Sat Nov 28 20:26:01 2009
@@ -26,9 +26,14 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskStatus.Phase;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.Job.RawSplit;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
/**
* Utilities used in unit test.
@@ -72,7 +77,7 @@
}
static class FakeJobInProgress extends JobInProgress {
- JobClient.RawSplit[] rawSplits;
+ Job.RawSplit[] rawSplits;
@SuppressWarnings("deprecation")
FakeJobInProgress(JobConf jobConf, JobTracker tracker) throws IOException {
super(new JobID(jtIdentifier, ++jobCounter), jobConf, tracker);
@@ -80,12 +85,13 @@
this.profile = new JobProfile(jobConf.getUser(), getJobID(),
jobFile.toString(), null, jobConf.getJobName(),
jobConf.getQueueName());
+ this.jobHistory = new FakeJobHistory();
}
@Override
public synchronized void initTasks() throws IOException {
- JobClient.RawSplit[] splits = createSplits();
+ Job.RawSplit[] splits = createSplits();
numMapTasks = splits.length;
createMapTasks(null, splits);
nonRunningMapCache = createCache(splits, maxLevel);
@@ -95,17 +101,17 @@
}
@Override
- JobClient.RawSplit[] createSplits(){
- JobClient.RawSplit[] splits = new JobClient.RawSplit[numMapTasks];
+ Job.RawSplit[] createSplits(){
+ Job.RawSplit[] splits = new Job.RawSplit[numMapTasks];
for (int i = 0; i < numMapTasks; i++) {
- splits[i] = new JobClient.RawSplit();
+ splits[i] = new Job.RawSplit();
splits[i].setLocations(new String[0]);
}
return splits;
}
@Override
- protected void createMapTasks(String ignored, JobClient.RawSplit[] splits) {
+ protected void createMapTasks(String ignored, Job.RawSplit[] splits) {
maps = new TaskInProgress[numMapTasks];
for (int i = 0; i < numMapTasks; i++) {
maps[i] = new TaskInProgress(getJobID(), "test",
@@ -187,6 +193,7 @@
String taskTracker) {
addRunningTaskToTIP(tip, taskId, new TaskTrackerStatus(taskTracker,
JobInProgress.convertTrackerNameToHostName(taskTracker)), true);
+
TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId,
0.0f, 1, TaskStatus.State.RUNNING, "", "", taskTracker,
tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters());
@@ -220,7 +227,7 @@
}
static short sendHeartBeat(JobTracker jt, TaskTrackerStatus status,
- boolean initialContact,
+ boolean initialContact, boolean acceptNewTasks,
String tracker, short responseId)
throws IOException {
if (status == null) {
@@ -228,13 +235,61 @@
JobInProgress.convertTrackerNameToHostName(tracker));
}
- jt.heartbeat(status, false, initialContact, false, responseId);
+ jt.heartbeat(status, false, initialContact, acceptNewTasks, responseId);
return ++responseId ;
}
static void establishFirstContact(JobTracker jt, String tracker)
throws IOException {
- sendHeartBeat(jt, null, true, tracker, (short) 0);
+ sendHeartBeat(jt, null, true, false, tracker, (short) 0);
}
+ static class FakeTaskInProgress extends TaskInProgress {
+
+ public FakeTaskInProgress(JobID jobId, String jobFile, int numMaps,
+ int partition, JobTracker jobTracker, JobConf conf, JobInProgress job,
+ int numSlotsRequired) {
+ super(jobId, jobFile, numMaps, partition, jobTracker, conf, job,
+ numSlotsRequired);
+ }
+
+ public FakeTaskInProgress(JobID jobId, String jobFile, RawSplit emptySplit,
+ JobTracker jobTracker, JobConf jobConf,
+ JobInProgress job, int partition, int numSlotsRequired) {
+ super(jobId, jobFile, emptySplit, jobTracker, jobConf, job,
+ partition, numSlotsRequired);
+ }
+
+ @Override
+ synchronized boolean updateStatus(TaskStatus status) {
+ TaskAttemptID taskid = status.getTaskID();
+ taskStatuses.put(taskid, status);
+ return false;
+ }
+ }
+
+ static class FakeJobHistory extends JobHistory {
+ @Override
+ public void init(JobTracker jt,
+ JobConf conf,
+ String hostname,
+ long jobTrackerStartTime) throws IOException { }
+
+ @Override
+ public void initDone(JobConf conf, FileSystem fs) throws IOException { }
+
+ @Override
+ public void markCompleted(org.apache.hadoop.mapreduce.JobID id)
+ throws IOException { }
+
+ @Override
+ public void shutDown() { }
+
+ @Override
+ public void
+ logEvent(HistoryEvent event, org.apache.hadoop.mapreduce.JobID id) { }
+
+ @Override
+ public void closeWriter(org.apache.hadoop.mapreduce.JobID id) { }
+ }
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/GenericMRLoadGenerator.java Sat Nov 28 20:26:01 2009
@@ -29,6 +29,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.examples.RandomTextWriter;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -89,15 +90,18 @@
job.setOutputValueClass(
Class.forName(argv[++i]).asSubclass(Writable.class));
} else if ("-keepmap".equals(argv[i])) {
- job.set("hadoop.sort.map.keep.percent", argv[++i]);
+ job.set(org.apache.hadoop.mapreduce.
+ GenericMRLoadGenerator.MAP_PRESERVE_PERCENT, argv[++i]);
} else if ("-keepred".equals(argv[i])) {
- job.set("hadoop.sort.reduce.keep.percent", argv[++i]);
+ job.set(org.apache.hadoop.mapreduce.
+ GenericMRLoadGenerator.REDUCE_PRESERVE_PERCENT, argv[++i]);
} else if ("-outdir".equals(argv[i])) {
FileOutputFormat.setOutputPath(job, new Path(argv[++i]));
} else if ("-indir".equals(argv[i])) {
FileInputFormat.addInputPaths(job, argv[++i]);
} else if ("-inFormatIndirect".equals(argv[i])) {
- job.setClass("mapred.indirect.input.format",
+ job.setClass(org.apache.hadoop.mapreduce.
+ GenericMRLoadGenerator.INDIRECT_INPUT_FORMAT,
Class.forName(argv[++i]).asSubclass(InputFormat.class),
InputFormat.class);
job.setInputFormat(IndirectInputFormat.class);
@@ -133,14 +137,18 @@
// No input dir? Generate random data
System.err.println("No input path; ignoring InputFormat");
confRandom(job);
- } else if (null != job.getClass("mapred.indirect.input.format", null)) {
+ } else if (null != job.getClass(
+ org.apache.hadoop.mapreduce.GenericMRLoadGenerator.INDIRECT_INPUT_FORMAT,
+ null)) {
// specified IndirectInputFormat? Build src list
JobClient jClient = new JobClient(job);
Path sysdir = jClient.getSystemDir();
Random r = new Random();
Path indirInputFile = new Path(sysdir,
Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files");
- job.set("mapred.indirect.input.file", indirInputFile.toString());
+ job.set(
+ org.apache.hadoop.mapreduce.GenericMRLoadGenerator.INDIRECT_INPUT_FILE,
+ indirInputFile.toString());
SequenceFile.Writer writer = SequenceFile.createWriter(
sysdir.getFileSystem(job), job, indirInputFile,
LongWritable.class, Text.class,
@@ -249,12 +257,12 @@
}
public void configure(JobConf job) {
- bytesToWrite = job.getLong("test.randomtextwrite.bytes_per_map",
+ bytesToWrite = job.getLong(RandomTextWriter.BYTES_PER_MAP,
1*1024*1024*1024);
- keymin = job.getInt("test.randomtextwrite.min_words_key", 5);
- keymax = job.getInt("test.randomtextwrite.max_words_key", 10);
- valmin = job.getInt("test.randomtextwrite.min_words_value", 5);
- valmax = job.getInt("test.randomtextwrite.max_words_value", 10);
+ keymin = job.getInt(RandomTextWriter.MIN_KEY, 5);
+ keymax = job.getInt(RandomTextWriter.MAX_KEY, 10);
+ valmin = job.getInt(RandomTextWriter.MIN_VALUE, 5);
+ valmax = job.getInt(RandomTextWriter.MAX_VALUE, 10);
}
public void map(Text key, Text val, OutputCollector<Text,Text> output,
@@ -291,19 +299,19 @@
job.setMapperClass(RandomMapOutput.class);
final ClusterStatus cluster = new JobClient(job).getClusterStatus();
- int numMapsPerHost = job.getInt("test.randomtextwrite.maps_per_host", 10);
+ int numMapsPerHost = job.getInt(RandomTextWriter.MAPS_PER_HOST, 10);
long numBytesToWritePerMap =
- job.getLong("test.randomtextwrite.bytes_per_map", 1*1024*1024*1024);
+ job.getLong(RandomTextWriter.BYTES_PER_MAP, 1*1024*1024*1024);
if (numBytesToWritePerMap == 0) {
throw new IOException(
- "Cannot have test.randomtextwrite.bytes_per_map set to 0");
+ "Cannot have " + RandomTextWriter.BYTES_PER_MAP + " set to 0");
}
- long totalBytesToWrite = job.getLong("test.randomtextwrite.total_bytes",
+ long totalBytesToWrite = job.getLong(RandomTextWriter.TOTAL_BYTES,
numMapsPerHost * numBytesToWritePerMap * cluster.getTaskTrackers());
int numMaps = (int)(totalBytesToWrite / numBytesToWritePerMap);
if (numMaps == 0 && totalBytesToWrite > 0) {
numMaps = 1;
- job.setLong("test.randomtextwrite.bytes_per_map", totalBytesToWrite);
+ job.setLong(RandomTextWriter.BYTES_PER_MAP, totalBytesToWrite);
}
job.setNumMapTasks(numMaps);
}
@@ -337,7 +345,9 @@
extends SampleMapReduceBase<K,V> implements Mapper<K,V,K,V> {
public void configure(JobConf job) {
- setKeep(job.getFloat("hadoop.sort.map.keep.percent", (float)100.0) /
+ setKeep(job.getFloat(
+ org.apache.hadoop.mapreduce.GenericMRLoadGenerator.MAP_PRESERVE_PERCENT,
+ (float)100.0) /
(float)100.0);
}
@@ -353,7 +363,8 @@
extends SampleMapReduceBase<K,V> implements Reducer<K,V,K,V> {
public void configure(JobConf job) {
- setKeep(job.getFloat("hadoop.sort.reduce.keep.percent", (float)100.0) /
+ setKeep(job.getFloat(org.apache.hadoop.mapreduce.
+ GenericMRLoadGenerator.REDUCE_PRESERVE_PERCENT, (float)100.0) /
(float)100.0);
}
@@ -401,7 +412,9 @@
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
- Path src = new Path(job.get("mapred.indirect.input.file", null));
+ Path src = new Path(job.get(
+ org.apache.hadoop.mapreduce.GenericMRLoadGenerator.INDIRECT_INPUT_FILE,
+ null));
FileSystem fs = src.getFileSystem(job);
ArrayList<IndirectSplit> splits = new ArrayList<IndirectSplit>(numSplits);
@@ -418,7 +431,8 @@
public RecordReader getRecordReader(InputSplit split, JobConf job,
Reporter reporter) throws IOException {
InputFormat indirIF = (InputFormat)ReflectionUtils.newInstance(
- job.getClass("mapred.indirect.input.format",
+ job.getClass(org.apache.hadoop.mapreduce.
+ GenericMRLoadGenerator.INDIRECT_INPUT_FORMAT,
SequenceFileInputFormat.class), job);
IndirectSplit is = ((IndirectSplit)split);
return indirIF.getRecordReader(new FileSplit(is.getPath(), 0,
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MRBench.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MRBench.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MRBench.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MRBench.java Sat Nov 28 20:26:01 2009
@@ -26,16 +26,19 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
/**
* Runs a job multiple times and takes average of all runs.
*/
-public class MRBench {
+public class MRBench extends Configured implements Tool{
private static final Log LOG = LogFactory.getLog(MRBench.class);
private static Path BASE_DIR =
@@ -87,7 +90,7 @@
* an appropriate number of leading '0' (zero) characters. The order of
* generated data is one of ascending, descending, or random.
*/
- public static void generateTextFile(FileSystem fs, Path inputFile,
+ public void generateTextFile(FileSystem fs, Path inputFile,
long numLines, Order sortOrder) throws IOException
{
LOG.info("creating control file: "+numLines+" numLines, "+sortOrder+" sortOrder");
@@ -137,8 +140,9 @@
/**
* Create the job configuration.
*/
- private static JobConf setupJob(int numMaps, int numReduces, String jarFile) {
- JobConf jobConf = new JobConf(MRBench.class);
+ private JobConf setupJob(int numMaps, int numReduces, String jarFile) {
+ JobConf jobConf = new JobConf(getConf());
+ jobConf.setJarByClass(MRBench.class);
FileInputFormat.addInputPath(jobConf, INPUT_DIR);
jobConf.setInputFormat(TextInputFormat.class);
@@ -165,8 +169,7 @@
* Runs a MapReduce task, given number of times. The input to each run
* is the same file.
*/
- private static ArrayList<Long> runJobInSequence(JobConf masterJobConf, int numRuns) throws IOException {
- Path intrimData = null;
+ private ArrayList<Long> runJobInSequence(JobConf masterJobConf, int numRuns) throws IOException {
Random rand = new Random();
ArrayList<Long> execTimes = new ArrayList<Long>();
@@ -204,7 +207,13 @@
* [-verbose]
* </pre>
*/
- public static void main (String[] args) throws IOException {
+ public static void main (String[] args) throws Exception {
+ int res = ToolRunner.run(new MRBench(), args);
+ System.exit(res);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
String version = "MRBenchmark.0.0.2";
System.out.println(version);
@@ -265,7 +274,7 @@
inputSortOrder == null)
{
System.err.println(usage);
- System.exit(-1);
+ return -1;
}
JobConf jobConf = setupJob(numMaps, numReduces, jarFile);
@@ -303,6 +312,7 @@
System.out.println("DataLines\tMaps\tReduces\tAvgTime (milliseconds)");
System.out.println(inputLines + "\t\t" + numMaps + "\t" +
numReduces + "\t" + avgTime);
+ return 0;
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java Sat Nov 28 20:26:01 2009
@@ -28,7 +28,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
@@ -64,7 +67,7 @@
/**
* An inner class that runs a job tracker.
*/
- class JobTrackerRunner implements Runnable {
+ public class JobTrackerRunner implements Runnable {
private JobTracker tracker = null;
private volatile boolean isActive = true;
@@ -107,7 +110,7 @@
try {
jc = (jc == null) ? createJobConf() : createJobConf(jc);
File f = new File("build/test/mapred/local").getAbsoluteFile();
- jc.set("mapred.local.dir",f.getAbsolutePath());
+ jc.set(MRConfig.LOCAL_DIR, f.getAbsolutePath());
jc.setClass("topology.node.switch.mapping.impl",
StaticMapping.class, DNSToSwitchMapping.class);
String id =
@@ -160,13 +163,13 @@
conf = createJobConf(cfg);
}
if (hostname != null) {
- conf.set("slave.host.name", hostname);
+ conf.set(TTConfig.TT_HOST_NAME, hostname);
}
- conf.set("mapred.task.tracker.http.address", "0.0.0.0:0");
- conf.set("mapred.task.tracker.report.address",
+ conf.set(TTConfig.TT_HTTP_ADDRESS, "0.0.0.0:0");
+ conf.set(TTConfig.TT_REPORT_ADDRESS,
"127.0.0.1:" + taskTrackerPort);
File localDirBase =
- new File(conf.get("mapred.local.dir")).getAbsoluteFile();
+ new File(conf.get(MRConfig.LOCAL_DIR)).getAbsoluteFile();
localDirBase.mkdirs();
StringBuffer localPath = new StringBuffer();
for(int i=0; i < numDir; ++i) {
@@ -183,8 +186,8 @@
}
localPath.append(localDirs[i]);
}
- conf.set("mapred.local.dir", localPath.toString());
- LOG.info("mapred.local.dir is " + localPath);
+ conf.set(MRConfig.LOCAL_DIR, localPath.toString());
+ LOG.info(MRConfig.LOCAL_DIR + " is " + localPath);
try {
tt = new TaskTracker(conf);
isInitialized = true;
@@ -337,11 +340,11 @@
UnixUserGroupInformation ugi) {
JobConf result = new JobConf(conf);
FileSystem.setDefaultUri(result, namenode);
- result.set("mapred.job.tracker", "localhost:"+jobTrackerPort);
- result.set("mapred.job.tracker.http.address",
+ result.set(JTConfig.JT_IPC_ADDRESS, "localhost:"+jobTrackerPort);
+ result.set(JTConfig.JT_HTTP_ADDRESS,
"127.0.0.1:" + jobTrackerInfoPort);
if (ugi != null) {
- result.set("mapred.system.dir", "/mapred/system");
+ result.set(JTConfig.JT_SYSTEM_DIR, "/mapred/system");
UnixUserGroupInformation.saveToConf(result,
UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
}
@@ -564,13 +567,7 @@
public JobConf getJobTrackerConf() {
return this.conf;
}
-
- /**
- * Get num events recovered
- */
- public int getNumEventsRecovered() {
- return jobTracker.getJobTracker().recoveryManager.totalEventsRecovered();
- }
+
public int getFaultCount(String hostName) {
return jobTracker.getJobTracker().getFaultCount(hostName);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/NotificationTestCase.java Sat Nov 28 20:26:01 2009
@@ -96,7 +96,8 @@
public static class NotificationServlet extends HttpServlet {
public static int counter = 0;
-
+ private static final long serialVersionUID = 1L;
+
protected void doGet(HttpServletRequest req, HttpServletResponse res)
throws ServletException, IOException {
switch (counter) {
@@ -140,8 +141,8 @@
protected JobConf createJobConf() {
JobConf conf = super.createJobConf();
conf.setJobEndNotificationURI(getNotificationUrlTemplate());
- conf.setInt("job.end.retry.attempts", 3);
- conf.setInt("job.end.retry.interval", 200);
+ conf.setInt(JobContext.END_NOTIFICATION_RETRIES, 3);
+ conf.setInt(JobContext.END_NOTIFICATION_RETRIE_INTERVAL, 200);
return conf;
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/ReliabilityTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/ReliabilityTest.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/ReliabilityTest.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/ReliabilityTest.java Sat Nov 28 20:26:01 2009
@@ -35,6 +35,7 @@
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
@@ -83,7 +84,7 @@
public int run(String[] args) throws Exception {
Configuration conf = getConf();
- if ("local".equals(conf.get("mapred.job.tracker", "local"))) {
+ if ("local".equals(conf.get(JTConfig.JT_IPC_ADDRESS, "local"))) {
displayUsage();
}
String[] otherArgs =
@@ -103,8 +104,8 @@
//to protect against the case of jobs failing even when multiple attempts
//fail, set some high values for the max attempts
- conf.setInt("mapred.map.max.attempts", 10);
- conf.setInt("mapred.reduce.max.attempts", 10);
+ conf.setInt(JobContext.MAP_MAX_ATTEMPTS, 10);
+ conf.setInt(JobContext.REDUCE_MAX_ATTEMPTS, 10);
runSleepJobTest(new JobClient(new JobConf(conf)), conf);
runSortJobTests(new JobClient(new JobConf(conf)), conf);
return 0;
@@ -122,7 +123,7 @@
"-r", Integer.toString(maxReduces),
"-mt", Integer.toString(mapSleepTime),
"-rt", Integer.toString(reduceSleepTime)};
- runTest(jc, conf, "org.apache.hadoop.examples.SleepJob", sleepJobArgs,
+ runTest(jc, conf, "org.apache.hadoop.mapreduce.SleepJob", sleepJobArgs,
new KillTaskThread(jc, 2, 0.2f, false, 2),
new KillTrackerThread(jc, 2, 0.4f, false, 1));
LOG.info("SleepJob done");
@@ -492,4 +493,4 @@
int res = ToolRunner.run(new Configuration(), new ReliabilityTest(), args);
System.exit(res);
}
-}
\ No newline at end of file
+}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java Sat Nov 28 20:26:01 2009
@@ -33,6 +33,7 @@
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.*;
@@ -55,7 +56,11 @@
static private final IntWritable sortInput = new IntWritable(1);
static private final IntWritable sortOutput = new IntWritable(2);
-
+ static public String SORT_REDUCES =
+ "mapreduce.sortvalidator.sort.reduce.tasks";
+ static public String MAPS_PER_HOST = "mapreduce.sortvalidator.mapsperhost";
+ static public String REDUCES_PER_HOST =
+ "mapreduce.sortvalidator.reducesperhost";
static void printUsage() {
System.err.println("sortvalidate [-m <maps>] [-r <reduces>] [-deep] " +
"-sortInput <sort-input-dir> -sortOutput <sort-output-dir>");
@@ -64,7 +69,7 @@
static private IntWritable deduceInputFile(JobConf job) {
Path[] inputPaths = FileInputFormat.getInputPaths(job);
- Path inputFile = new Path(job.get("map.input.file"));
+ Path inputFile = new Path(job.get(JobContext.MAP_INPUT_FILE));
// value == one for sort-input; value == two for sort-output
return (inputFile.getParent().equals(inputPaths[0])) ?
@@ -208,12 +213,12 @@
// Figure the 'current' partition and no. of reduces of the 'sort'
try {
- URI inputURI = new URI(job.get("map.input.file"));
+ URI inputURI = new URI(job.get(JobContext.MAP_INPUT_FILE));
String inputFile = inputURI.getPath();
// part file is of the form part-r-xxxxx
partition = Integer.valueOf(inputFile.substring(
inputFile.lastIndexOf("part") + 7)).intValue();
- noSortReducers = job.getInt("sortvalidate.sort.reduce.tasks", -1);
+ noSortReducers = job.getInt(SORT_REDUCES, -1);
} catch (Exception e) {
System.err.println("Caught: " + e);
System.exit(-1);
@@ -322,7 +327,7 @@
int noSortReduceTasks =
outputfs.listStatus(sortOutput, sortPathsFilter).length;
- jobConf.setInt("sortvalidate.sort.reduce.tasks", noSortReduceTasks);
+ jobConf.setInt(SORT_REDUCES, noSortReduceTasks);
int noSortInputpaths = inputfs.listStatus(sortInput).length;
jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
@@ -347,7 +352,7 @@
FileOutputFormat.setOutputPath(jobConf, outputPath);
// Uncomment to run locally in a single process
- //job_conf.set("mapred.job.tracker", "local");
+ //job_conf.set(JTConfig.JT, "local");
Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
System.out.println("\nSortValidator.RecordStatsChecker: Validate sort " +
"from " + inputPaths[0] + " (" +
@@ -468,11 +473,11 @@
ClusterStatus cluster = client.getClusterStatus();
if (noMaps == -1) {
noMaps = cluster.getTaskTrackers() *
- jobConf.getInt("test.sortvalidate.maps_per_host", 10);
+ jobConf.getInt(MAPS_PER_HOST, 10);
}
if (noReduces == -1) {
noReduces = (int) (cluster.getMaxReduceTasks() * 0.9);
- String sortReduces = jobConf.get("test.sortvalidate.reduces_per_host");
+ String sortReduces = jobConf.get(REDUCES_PER_HOST);
if (sortReduces != null) {
noReduces = cluster.getTaskTrackers() *
Integer.parseInt(sortReduces);
@@ -491,7 +496,7 @@
FileOutputFormat.setOutputPath(jobConf, outputPath);
// Uncomment to run locally in a single process
- //job_conf.set("mapred.job.tracker", "local");
+ //job_conf.set(JTConfig.JT, "local");
Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
System.out.println("\nSortValidator.RecordChecker: Running on " +
cluster.getTaskTrackers() +
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestBadRecords.java Sat Nov 28 20:26:01 2009
@@ -72,7 +72,7 @@
conf.setJobName("mr");
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
- conf.setInt("mapred.task.timeout", 30*1000);
+ conf.setInt(JobContext.TASK_TIMEOUT, 30*1000);
SkipBadRecords.setMapperMaxSkipRecords(conf, Long.MAX_VALUE);
SkipBadRecords.setReducerMaxSkipGroups(conf, Long.MAX_VALUE);
@@ -167,7 +167,7 @@
Path[] outputFiles = FileUtil.stat2Paths(
getFileSystem().listStatus(getOutputDir(),
- new OutputLogFilter()));
+ new Utils.OutputFileUtils.OutputFilesFilter()));
List<String> mapperOutput=getProcessed(input, mapperBadRecords);
LOG.debug("mapperOutput " + mapperOutput.size());