You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/12/23 19:16:59 UTC
svn commit: r729052 - in /hadoop/core/trunk: ./
src/mapred/org/apache/hadoop/mapreduce/lib/input/
src/mapred/org/apache/hadoop/mapreduce/lib/map/
src/mapred/org/apache/hadoop/mapreduce/lib/output/
Author: omalley
Date: Tue Dec 23 10:16:59 2008
New Revision: 729052
URL: http://svn.apache.org/viewvc?rev=729052&view=rev
Log:
HADOOP-4909. Fix Javadoc and make some of the API more consistent in their
use of the JobContext instead of Configuration. (omalley)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFormat.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Dec 23 10:16:59 2008
@@ -301,14 +301,18 @@
HADOOP-4753. Refactor gridmix2 to reduce code duplication. (cdouglas)
+ HADOOP-4909. Fix Javadoc and make some of the API more consistent in their
+ use of the JobContext instead of Configuration. (omalley)
+
OPTIMIZATIONS
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
based on the rack/host that has the most number of bytes.
(Jothi Padmanabhan via ddas)
- HADOOP-4683. Fixes Reduce shuffle scheduler to invoke getMapCompletionEvents
- in a separate thread. (Jothi Padmanabhan via ddas)
+ HADOOP-4683. Fixes Reduce shuffle scheduler to invoke
+ getMapCompletionEvents in a separate thread. (Jothi Padmanabhan
+ via ddas)
BUG FIXES
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Tue Dec 23 10:16:59 2008
@@ -50,7 +50,7 @@
*/
public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
- public static final Log LOG = LogFactory.getLog(FileInputFormat.class);
+ private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
private static final double SPLIT_SLOP = 1.1; // 10% slop
@@ -128,8 +128,13 @@
job.getConfiguration().setLong("mapred.min.split.size", size);
}
- public static long getMinSplitSize(Configuration conf) {
- return conf.getLong("mapred.min.split.size", 1L);
+ /**
+ * Get the minimum split size
+ * @param job the job
+ * @return the minimum number of bytes that can be in a split
+ */
+ public static long getMinSplitSize(JobContext job) {
+ return job.getConfiguration().getLong("mapred.min.split.size", 1L);
}
/**
@@ -142,8 +147,14 @@
job.getConfiguration().setLong("mapred.max.split.size", size);
}
- public static long getMaxSplitSize(Configuration conf) {
- return conf.getLong("mapred.max.split.size", Long.MAX_VALUE);
+ /**
+ * Get the maximum split size.
+ * @param context the job to look at.
+ * @return the maximum number of bytes a split can include
+ */
+ public static long getMaxSplitSize(JobContext context) {
+ return context.getConfiguration().getLong("mapred.max.split.size",
+ Long.MAX_VALUE);
}
/**
@@ -151,7 +162,8 @@
*
* @return the PathFilter instance set for the job, NULL if none has been set.
*/
- public static PathFilter getInputPathFilter(Configuration conf) {
+ public static PathFilter getInputPathFilter(JobContext context) {
+ Configuration conf = context.getConfiguration();
Class<?> filterClass = conf.getClass("mapred.input.pathFilter.class", null,
PathFilter.class);
return (filterClass != null) ?
@@ -166,8 +178,8 @@
* @return array of FileStatus objects
* @throws IOException if zero items.
*/
- protected List<FileStatus> listStatus(Configuration job
- ) throws IOException {
+ protected List<FileStatus> listStatus(JobContext job
+ ) throws IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
@@ -188,7 +200,7 @@
for (int i=0; i < dirs.length; ++i) {
Path p = dirs[i];
- FileSystem fs = p.getFileSystem(job);
+ FileSystem fs = p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, inputFilter);
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));
@@ -216,11 +228,11 @@
}
- /** Splits files returned by {@link #listStatus(Configuration)} when
- * they're too big.*/
- public List<InputSplit> getSplits(JobContext context
+ /**
+ * Generate the list of files and make them into FileSplits.
+ */
+ public List<InputSplit> getSplits(JobContext job
) throws IOException {
- Configuration job = context.getConfiguration();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
@@ -228,10 +240,10 @@
List<InputSplit> splits = new ArrayList<InputSplit>();
for (FileStatus file: listStatus(job)) {
Path path = file.getPath();
- FileSystem fs = path.getFileSystem(context.getConfiguration());
+ FileSystem fs = path.getFileSystem(job.getConfiguration());
long length = file.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
- if ((length != 0) && isSplitable(context, path)) {
+ if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
@@ -391,11 +403,11 @@
/**
* Get the list of input {@link Path}s for the map-reduce job.
*
- * @param conf The configuration of the job
+ * @param context The job
* @return the list of input {@link Path}s for the map-reduce job.
*/
- public static Path[] getInputPaths(Configuration conf) {
- String dirs = conf.get("mapred.input.dir", "");
+ public static Path[] getInputPaths(JobContext context) {
+ String dirs = context.getConfiguration().get("mapred.input.dir", "");
String [] list = StringUtils.split(dirs);
Path[] result = new Path[list.length];
for (int i = 0; i < list.length; i++) {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFormat.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/SequenceFileInputFormat.java Tue Dec 23 10:16:59 2008
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -30,6 +29,7 @@
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -43,12 +43,14 @@
return new SequenceFileRecordReader<K,V>();
}
+ @Override
protected long getFormatMinSplitSize() {
return SequenceFile.SYNC_INTERVAL;
}
- protected List<FileStatus> listStatus(Configuration job
- )throws IOException {
+ @Override
+ protected List<FileStatus> listStatus(JobContext job
+ )throws IOException {
List<FileStatus> files = super.listStatus(job);
int len = files.size();
@@ -56,7 +58,7 @@
FileStatus file = files.get(i);
if (file.isDir()) { // it's a MapFile
Path p = file.getPath();
- FileSystem fs = p.getFileSystem(job);
+ FileSystem fs = p.getFileSystem(job.getConfiguration());
// use the data file
files.set(i, fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME)));
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/TextInputFormat.java Tue Dec 23 10:16:59 2008
@@ -41,6 +41,7 @@
return new LineRecordReader();
}
+ @Override
protected boolean isSplitable(JobContext context, Path file) {
CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java Tue Dec 23 10:16:59 2008
@@ -22,6 +22,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -58,38 +60,71 @@
private Context outer;
private List<MapRunner> runners;
- public static int getNumberOfThreads(Configuration conf) {
- return conf.getInt("mapred.map.multithreadedrunner.threads", 10);
- }
-
- public static void setNumberOfThreads(Configuration conf, int threads) {
- conf.setInt("mapred.map.multithreadedrunner.threads", threads);
- }
-
+ /**
+ * The number of threads in the thread pool that will run the map function.
+ * @param job the job
+ * @return the number of threads
+ */
+ public static int getNumberOfThreads(JobContext job) {
+ return job.getConfiguration().
+ getInt("mapred.map.multithreadedrunner.threads", 10);
+ }
+
+ /**
+ * Set the number of threads in the pool for running maps.
+ * @param job the job to modify
+ * @param threads the new number of threads
+ */
+ public static void setNumberOfThreads(Job job, int threads) {
+ job.getConfiguration().setInt("mapred.map.multithreadedrunner.threads",
+ threads);
+ }
+
+ /**
+ * Get the application's mapper class.
+ * @param <K1> the map's input key type
+ * @param <V1> the map's input value type
+ * @param <K2> the map's output key type
+ * @param <V2> the map's output value type
+ * @param job the job
+ * @return the mapper class to run
+ */
@SuppressWarnings("unchecked")
public static <K1,V1,K2,V2>
- Class<Mapper<K1,V1,K2,V2>> getMapperClass(Configuration conf) {
+ Class<Mapper<K1,V1,K2,V2>> getMapperClass(JobContext job) {
return (Class<Mapper<K1,V1,K2,V2>>)
- conf.getClass("mapred.map.multithreadedrunner.class",
- Mapper.class);
+ job.getConfiguration().getClass("mapred.map.multithreadedrunner.class",
+ Mapper.class);
}
+ /**
+ * Set the application's mapper class.
+ * @param <K1> the map input key type
+ * @param <V1> the map input value type
+ * @param <K2> the map output key type
+ * @param <V2> the map output value type
+ * @param job the job to modify
+ * @param cls the class to use as the mapper
+ */
public static <K1,V1,K2,V2>
- void setMapperClass(Configuration conf,
+ void setMapperClass(Job job,
Class<Mapper<K1,V1,K2,V2>> cls) {
if (MultithreadedMapper.class.isAssignableFrom(cls)) {
throw new IllegalArgumentException("Can't have recursive " +
"MultithreadedMapper instances.");
}
- conf.setClass("mapred.map.multithreadedrunner.class", cls, Mapper.class);
+ job.getConfiguration().setClass("mapred.map.multithreadedrunner.class",
+ cls, Mapper.class);
}
+ /**
+ * Run the application's maps using a thread pool.
+ */
@Override
public void run(Context context) throws IOException, InterruptedException {
- Configuration conf = context.getConfiguration();
outer = context;
- int numberOfThreads = getNumberOfThreads(conf);
- mapClass = getMapperClass(conf);
+ int numberOfThreads = getNumberOfThreads(context);
+ mapClass = getMapperClass(context);
if (LOG.isDebugEnabled()) {
LOG.debug("Configuring multithread runner to use " + numberOfThreads +
" threads");
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/map/TokenCounterMapper.java Tue Dec 23 10:16:59 2008
@@ -25,6 +25,9 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
+/**
+ * Tokenize the input values and emit each word with a count of 1.
+ */
public class TokenCounterMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Tue Dec 23 10:16:59 2008
@@ -47,6 +47,12 @@
private Path outputPath = null;
private Path workPath = null;
+ /**
+ * Create a file output committer
+ * @param outputPath the job's output path
+ * @param context the task's context
+ * @throws IOException
+ */
public FileOutputCommitter(Path outputPath,
TaskAttemptContext context) throws IOException {
if (outputPath != null) {
@@ -59,6 +65,11 @@
}
}
+ /**
+ * Create the temporary directory that is the root of all of the task
+ * work directories.
+ * @param context the job's context
+ */
public void setupJob(JobContext context) throws IOException {
if (outputPath != null) {
Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
@@ -69,6 +80,10 @@
}
}
+ /**
+ * Delete the temporary directory, including all of the work directories.
+ * @param context the job's context
+ */
public void cleanupJob(JobContext context) throws IOException {
if (outputPath != null) {
Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
@@ -79,12 +94,20 @@
}
}
+ /**
+ * No task setup required.
+ */
+ @Override
public void setupTask(TaskAttemptContext context) throws IOException {
// FileOutputCommitter's setupTask doesn't do anything. Because the
// temporary task directory is created on demand when the
// task is writing.
}
-
+
+ /**
+ * Move the files from the work directory to the job output directory
+ * @param context the task context
+ */
public void commitTask(TaskAttemptContext context)
throws IOException {
TaskAttemptID attemptId = context.getTaskAttemptID();
@@ -103,7 +126,15 @@
}
}
}
-
+
+ /**
+ * Move all of the files from the work directory to the final output
+ * @param context the task context
+ * @param fs the output file system
+ * @param jobOutputDir the final output direcotry
+ * @param taskOutput the work path
+ * @throws IOException
+ */
private void moveTaskOutputs(TaskAttemptContext context,
FileSystem fs,
Path jobOutputDir,
@@ -137,6 +168,10 @@
}
}
+ /**
+ * Delete the work directory
+ */
+ @Override
public void abortTask(TaskAttemptContext context) {
try {
context.progress();
@@ -146,11 +181,20 @@
}
}
+ /**
+ * Find the final name of a given output file, given the job output directory
+ * and the work directory.
+ * @param jobOutputDir the job's output directory
+ * @param taskOutput the specific task output file
+ * @param taskOutputPath the job's work directory
+ * @return the final path for the specific output file
+ * @throws IOException
+ */
private Path getFinalPath(Path jobOutputDir, Path taskOutput,
Path taskOutputPath) throws IOException {
URI taskOutputUri = taskOutput.toUri();
URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
- if (taskOutputUri == relativePath) {//taskOutputPath is not a parent of taskOutput
+ if (taskOutputUri == relativePath) {
throw new IOException("Can not get the relative path: base = " +
taskOutputPath + " child = " + taskOutput);
}
@@ -161,6 +205,11 @@
}
}
+ /**
+ * Did this task write any files in the work directory?
+ * @param context the task's context
+ */
+ @Override
public boolean needsTaskCommit(TaskAttemptContext context
) throws IOException {
return workPath != null && outputFileSystem.exists(workPath);
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java Tue Dec 23 10:16:59 2008
@@ -59,12 +59,12 @@
/**
* Is the job output compressed?
- * @param conf the {@link Configuration} to look in
+ * @param job the Job to look in
* @return <code>true</code> if the job output should be compressed,
* <code>false</code> otherwise
*/
- public static boolean getCompressOutput(Configuration conf) {
- return conf.getBoolean("mapred.output.compress", false);
+ public static boolean getCompressOutput(JobContext job) {
+ return job.getConfiguration().getBoolean("mapred.output.compress", false);
}
/**
@@ -84,17 +84,17 @@
/**
* Get the {@link CompressionCodec} for compressing the job outputs.
- * @param conf the {@link Configuration} to look in
+ * @param job the {@link Job} to look in
* @param defaultValue the {@link CompressionCodec} to return if not set
* @return the {@link CompressionCodec} to be used to compress the
* job outputs
* @throws IllegalArgumentException if the class was specified, but not found
*/
public static Class<? extends CompressionCodec>
- getOutputCompressorClass(Configuration conf,
+ getOutputCompressorClass(JobContext job,
Class<? extends CompressionCodec> defaultValue) {
Class<? extends CompressionCodec> codecClass = defaultValue;
-
+ Configuration conf = job.getConfiguration();
String name = conf.get("mapred.output.compression.codec");
if (name != null) {
try {
@@ -109,18 +109,17 @@
}
public abstract RecordWriter<K, V>
- getRecordWriter(TaskAttemptContext context
+ getRecordWriter(TaskAttemptContext job
) throws IOException, InterruptedException;
- public void checkOutputSpecs(JobContext context
+ public void checkOutputSpecs(JobContext job
) throws FileAlreadyExistsException, IOException{
// Ensure that the output directory is set and not already there
- Configuration job = context.getConfiguration();
Path outDir = getOutputPath(job);
if (outDir == null) {
throw new InvalidJobConfException("Output directory not set.");
}
- if (outDir.getFileSystem(job).exists(outDir)) {
+ if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
" already exists");
}
@@ -143,8 +142,8 @@
* @return the {@link Path} to the output directory for the map-reduce job.
* @see FileOutputFormat#getWorkOutputPath(TaskInputOutputContext)
*/
- public static Path getOutputPath(Configuration conf) {
- String name = conf.get("mapred.output.dir");
+ public static Path getOutputPath(JobContext job) {
+ String name = job.getConfiguration().get("mapred.output.dir");
return name == null ? null: new Path(name);
}
@@ -261,7 +260,7 @@
OutputCommitter getOutputCommitter(TaskAttemptContext context
) throws IOException {
if (committer == null) {
- Path output = getOutputPath(context.getConfiguration());
+ Path output = getOutputPath(context);
committer = new FileOutputCommitter(output, context);
}
return committer;
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java Tue Dec 23 10:16:59 2008
@@ -28,6 +28,7 @@
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -44,12 +45,13 @@
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
- if (getCompressOutput(conf)) {
+ if (getCompressOutput(context)) {
// find the kind of compression to do
- compressionType = getOutputCompressionType(conf);
+ compressionType = getOutputCompressionType(context);
// find the right codec
- Class<?> codecClass = getOutputCompressorClass(conf, DefaultCodec.class);
+ Class<?> codecClass = getOutputCompressorClass(context,
+ DefaultCodec.class);
codec = (CompressionCodec)
ReflectionUtils.newInstance(codecClass, conf);
}
@@ -80,13 +82,13 @@
/**
* Get the {@link CompressionType} for the output {@link SequenceFile}.
- * @param conf the {@link Configuration}
+ * @param job the {@link Job}
* @return the {@link CompressionType} for the output {@link SequenceFile},
* defaulting to {@link CompressionType#RECORD}
*/
- public static CompressionType getOutputCompressionType(Configuration conf) {
- String val = conf.get("mapred.output.compression.type",
- CompressionType.RECORD.toString());
+ public static CompressionType getOutputCompressionType(JobContext job) {
+ String val = job.getConfiguration().get("mapred.output.compression.type",
+ CompressionType.RECORD.toString());
return CompressionType.valueOf(val);
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java?rev=729052&r1=729051&r2=729052&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java Tue Dec 23 10:16:59 2008
@@ -108,26 +108,27 @@
}
public RecordWriter<K, V>
- getRecordWriter(TaskAttemptContext context
+ getRecordWriter(TaskAttemptContext job
) throws IOException, InterruptedException {
- Configuration job = context.getConfiguration();
+ Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
- String keyValueSeparator= job.get("mapred.textoutputformat.separator","\t");
+ String keyValueSeparator= conf.get("mapred.textoutputformat.separator",
+ "\t");
CompressionCodec codec = null;
String extension = "";
if (isCompressed) {
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
- codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, job);
+ codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
- Path file = getDefaultWorkFile(context, extension);
- FileSystem fs = file.getFileSystem(job);
+ Path file = getDefaultWorkFile(job, extension);
+ FileSystem fs = file.getFileSystem(conf);
if (!isCompressed) {
- FSDataOutputStream fileOut = fs.create(file, context);
+ FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
- FSDataOutputStream fileOut = fs.create(file, context);
+ FSDataOutputStream fileOut = fs.create(file, false);
return new LineRecordWriter<K, V>(new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);