You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC
svn commit: r885145 [25/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233:
./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/
src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/JobContext.java Sat Nov 28 20:26:01 2009
@@ -19,89 +19,246 @@
package org.apache.hadoop.mapreduce;
import java.io.IOException;
+import java.net.URI;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
-import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
/**
* A read-only view of the job that is provided to the tasks while they
* are running.
*/
-public class JobContext {
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface JobContext {
// Put all of the attribute names in here so that Job and JobContext are
// consistent.
- protected static final String INPUT_FORMAT_CLASS_ATTR =
- "mapreduce.inputformat.class";
- protected static final String MAP_CLASS_ATTR = "mapreduce.map.class";
- protected static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
- protected static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
- protected static final String OUTPUT_FORMAT_CLASS_ATTR =
- "mapreduce.outputformat.class";
- protected static final String PARTITIONER_CLASS_ATTR =
- "mapreduce.partitioner.class";
-
- protected final org.apache.hadoop.mapred.JobConf conf;
- private final JobID jobId;
-
- public JobContext(Configuration conf, JobID jobId) {
- this.conf = new org.apache.hadoop.mapred.JobConf(conf);
- this.jobId = jobId;
- }
+ public static final String INPUT_FORMAT_CLASS_ATTR =
+ "mapreduce.job.inputformat.class";
+ public static final String MAP_CLASS_ATTR = "mapreduce.job.map.class";
+ public static final String COMBINE_CLASS_ATTR =
+ "mapreduce.job.combine.class";
+ public static final String REDUCE_CLASS_ATTR =
+ "mapreduce.job.reduce.class";
+ public static final String OUTPUT_FORMAT_CLASS_ATTR =
+ "mapreduce.job.outputformat.class";
+ public static final String PARTITIONER_CLASS_ATTR =
+ "mapreduce.job.partitioner.class";
+
+ public static final String SETUP_CLEANUP_NEEDED =
+ "mapreduce.job.committer.setup.cleanup.needed";
+ public static final String JAR = "mapreduce.job.jar";
+ public static final String ID = "mapreduce.job.id";
+ public static final String JOB_NAME = "mapreduce.job.name";
+ public static final String USER_NAME = "mapreduce.job.user.name";
+ public static final String PRIORITY = "mapreduce.job.priority";
+ public static final String QUEUE_NAME = "mapreduce.job.queuename";
+ public static final String JVM_NUMTASKS_TORUN =
+ "mapreduce.job.jvm.numtasks";
+ public static final String SPLIT_FILE = "mapreduce.job.splitfile";
+ public static final String NUM_MAPS = "mapreduce.job.maps";
+ public static final String MAX_TASK_FAILURES_PER_TRACKER =
+ "mapreduce.job.maxtaskfailures.per.tracker";
+ public static final String COMPLETED_MAPS_FOR_REDUCE_SLOWSTART =
+ "mapreduce.job.reduce.slowstart.completedmaps";
+ public static final String NUM_REDUCES = "mapreduce.job.reduces";
+ public static final String SKIP_RECORDS = "mapreduce.job.skiprecords";
+ public static final String SKIP_OUTDIR = "mapreduce.job.skip.outdir";
+ public static final String SPECULATIVE_SLOWNODE_THRESHOLD =
+ "mapreduce.job.speculative.slownodethreshold";
+ public static final String SPECULATIVE_SLOWTASK_THRESHOLD =
+ "mapreduce.job.speculative.slowtaskthreshold";
+ public static final String SPECULATIVECAP =
+ "mapreduce.job.speculative.speculativecap";
+ public static final String JOB_LOCAL_DIR = "mapreduce.job.local.dir";
+ public static final String OUTPUT_KEY_CLASS =
+ "mapreduce.job.output.key.class";
+ public static final String OUTPUT_VALUE_CLASS =
+ "mapreduce.job.output.value.class";
+ public static final String KEY_COMPARATOR =
+ "mapreduce.job.output.key.comparator.class";
+ public static final String GROUP_COMPARATOR_CLASS =
+ "mapreduce.job.output.group.comparator.class";
+ public static final String WORKING_DIR = "mapreduce.job.working.dir";
+ public static final String HISTORY_LOCATION =
+ "mapreduce.job.userhistorylocation";
+ public static final String END_NOTIFICATION_URL =
+ "mapreduce.job.end-notification.url";
+ public static final String END_NOTIFICATION_RETRIES =
+ "mapreduce.job.end-notification.retry.attempts";
+ public static final String END_NOTIFICATION_RETRIE_INTERVAL =
+ "mapreduce.job.end-notification.retry.interval";
+ public static final String CLASSPATH_ARCHIVES =
+ "mapreduce.job.classpath.archives";
+ public static final String CLASSPATH_FILES = "mapreduce.job.classpath.files";
+ public static final String CACHE_FILES = "mapreduce.job.cache.files";
+ public static final String CACHE_ARCHIVES = "mapreduce.job.cache.archives";
+ public static final String CACHE_LOCALFILES =
+ "mapreduce.job.cache.local.files";
+ public static final String CACHE_LOCALARCHIVES =
+ "mapreduce.job.cache.local.archives";
+ public static final String CACHE_FILE_TIMESTAMPS =
+ "mapreduce.job.cache.files.timestamps";
+ public static final String CACHE_ARCHIVES_TIMESTAMPS =
+ "mapreduce.job.cache.archives.timestamps";
+ public static final String CACHE_SYMLINK =
+ "mapreduce.job.cache.symlink.create";
+
+ public static final String IO_SORT_FACTOR =
+ "mapreduce.task.io.sort.factor";
+ public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb";
+ public static final String PRESERVE_FAILED_TASK_FILES =
+ "mapreduce.task.files.preserve.failedtasks";
+ public static final String PRESERVE_FILES_PATTERN =
+ "mapreduce.task.files.preserve.filepattern";
+ public static final String TASK_TEMP_DIR = "mapreduce.task.tmp.dir";
+ public static final String TASK_DEBUGOUT_LINES =
+ "mapreduce.task.debugout.lines";
+ public static final String RECORDS_BEFORE_PROGRESS =
+ "mapreduce.task.merge.progress.records";
+ public static final String SKIP_START_ATTEMPTS =
+ "mapreduce.task.skip.start.attempts";
+ public static final String TASK_ATTEMPT_ID = "mapreduce.task.attempt.id";
+ public static final String TASK_ISMAP = "mapreduce.task.ismap";
+ public static final String TASK_PARTITION = "mapreduce.task.partition";
+ public static final String TASK_PROFILE = "mapreduce.task.profile";
+ public static final String TASK_PROFILE_PARAMS =
+ "mapreduce.task.profile.params";
+ public static final String NUM_MAP_PROFILES =
+ "mapreduce.task.profile.maps";
+ public static final String NUM_REDUCE_PROFILES =
+ "mapreduce.task.profile.reduces";
+ public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
+ public static final String TASK_ID = "mapreduce.task.id";
+ public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";
+ public static final String TASK_USERLOG_LIMIT =
+ "mapreduce.task.userlog.limit.kb";
+ public static final String TASK_LOG_RETAIN_HOURS =
+ "mapred.task.userlog.retain.hours";
+
+ public static final String MAP_SORT_RECORD_PERCENT =
+ "mapreduce.map.sort.record.percent";
+ public static final String MAP_SORT_SPILL_PERCENT =
+ "mapreduce.map.sort.spill.percent";
+ public static final String MAP_INPUT_FILE = "mapreduce.map.input.file";
+ public static final String MAP_INPUT_PATH = "mapreduce.map.input.length";
+ public static final String MAP_INPUT_START = "mapreduce.map.input.start";
+ public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
+ public static final String MAP_ENV = "mapreduce.map.env";
+ public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts";
+ public static final String MAP_ULIMIT = "mapreduce.map.ulimit";
+ public static final String MAP_MAX_ATTEMPTS = "mapreduce.map.maxattempts";
+ public static final String MAP_DEBUG_SCRIPT =
+ "mapreduce.map.debug.script";
+ public static final String MAP_SPECULATIVE = "mapreduce.map.speculative";
+ public static final String MAP_FAILURES_MAX_PERCENT =
+ "mapreduce.map.failures.maxpercent";
+ public static final String MAP_SKIP_INCR_PROC_COUNT =
+ "mapreduce.map.skip.proc-count.auto-incr";
+ public static final String MAP_SKIP_MAX_RECORDS =
+ "mapreduce.map.skip.maxrecords";
+ public static final String MAP_COMBINE_MIN_SPISS =
+ "mapreduce.map.combine.minspills";
+ public static final String MAP_OUTPUT_COMPRESS =
+ "mapreduce.map.output.compress";
+ public static final String MAP_OUTPUT_COMPRESS_CODEC =
+ "mapreduce.map.output.compress.codec";
+ public static final String MAP_OUTPUT_KEY_CLASS =
+ "mapreduce.map.output.key.class";
+ public static final String MAP_OUTPUT_VALUE_CLASS =
+ "mapreduce.map.output.value.class";
+ public static final String MAP_OUTPUT_KEY_FIELD_SEPERATOR =
+ "mapreduce.map.output.key.field.separator";
+ public static final String MAP_LOG_LEVEL = "mapreduce.map.log.level";
+
+ public static final String REDUCE_LOG_LEVEL =
+ "mapreduce.reduce.log.level";
+ public static final String REDUCE_MERGE_INMEM_THRESHOLD =
+ "mapreduce.reduce.merge.inmem.threshold";
+ public static final String REDUCE_INPUT_BUFFER_PERCENT =
+ "mapreduce.reduce.input.buffer.percent";
+ public static final String REDUCE_MARKRESET_BUFFER_PERCENT =
+ "mapreduce.reduce.markreset.buffer.percent";
+ public static final String REDUCE_MARKRESET_BUFFER_SIZE =
+ "mapreduce.reduce.markreset.buffer.size";
+ public static final String REDUCE_MEMORY_MB =
+ "mapreduce.reduce.memory.mb";
+ public static final String REDUCE_MEMORY_TOTAL_BYTES =
+ "mapreduce.reduce.memory.totalbytes";
+ public static final String SHUFFLE_INPUT_BUFFER_PERCENT =
+ "mapreduce.reduce.shuffle.input.buffer.percent";
+ public static final String SHUFFLE_MERGE_EPRCENT =
+ "mapreduce.reduce.shuffle.merge.percent";
+ public static final String REDUCE_FAILURES_MAXPERCENT =
+ "mapreduce.reduce.failures.maxpercent";
+ public static final String REDUCE_ENV = "mapreduce.reduce.env";
+ public static final String REDUCE_JAVA_OPTS =
+ "mapreduce.reduce.java.opts";
+ public static final String REDUCE_ULIMIT = "mapreduce.reduce.ulimit";
+ public static final String REDUCE_MAX_ATTEMPTS =
+ "mapreduce.reduce.maxattempts";
+ public static final String SHUFFLE_PARALLEL_COPIES =
+ "mapreduce.reduce.shuffle.parallelcopies";
+ public static final String REDUCE_DEBUG_SCRIPT =
+ "mapreduce.reduce.debug.script";
+ public static final String REDUCE_SPECULATIVE =
+ "mapreduce.reduce.speculative";
+ public static final String SHUFFLE_CONNECT_TIMEOUT =
+ "mapreduce.reduce.shuffle.connect.timeout";
+ public static final String SHUFFLE_READ_TIMEOUT =
+ "mapreduce.reduce.shuffle.read.timeout";
+ public static final String REDUCE_SKIP_INCR_PROC_COUNT =
+ "mapreduce.reduce.skip.proc-count.auto-incr";
+ public static final String REDUCE_SKIP_MAXGROUPS =
+ "mapreduce.reduce.skip.maxgroups";
+ public static final String REDUCE_MEMTOMEM_THRESHOLD =
+ "mapreduce.reduce.merge.memtomem.threshold";
+ public static final String REDUCE_MEMTOMEM_ENABLED =
+ "mapreduce.reduce.merge.memtomem.enabled";
+ public static final String JOB_TOKEN_FILE = "mapreduce.job.jobTokenFile";
/**
* Return the configuration for the job.
* @return the shared configuration object
*/
- public Configuration getConfiguration() {
- return conf;
- }
+ public Configuration getConfiguration();
/**
* Get the unique ID for the job.
* @return the object with the job id
*/
- public JobID getJobID() {
- return jobId;
- }
+ public JobID getJobID();
/**
* Get configured the number of reduce tasks for this job. Defaults to
* <code>1</code>.
* @return the number of reduce tasks for this job.
*/
- public int getNumReduceTasks() {
- return conf.getNumReduceTasks();
- }
+ public int getNumReduceTasks();
/**
* Get the current working directory for the default file system.
*
* @return the directory name.
*/
- public Path getWorkingDirectory() throws IOException {
- return conf.getWorkingDirectory();
- }
+ public Path getWorkingDirectory() throws IOException;
/**
* Get the key class for the job output data.
* @return the key class for the job output data.
*/
- public Class<?> getOutputKeyClass() {
- return conf.getOutputKeyClass();
- }
+ public Class<?> getOutputKeyClass();
/**
* Get the value class for job outputs.
* @return the value class for job outputs.
*/
- public Class<?> getOutputValueClass() {
- return conf.getOutputValueClass();
- }
+ public Class<?> getOutputValueClass();
/**
* Get the key class for the map output data. If it is not set, use the
@@ -109,9 +266,7 @@
* different than the final output key class.
* @return the map output key class.
*/
- public Class<?> getMapOutputKeyClass() {
- return conf.getMapOutputKeyClass();
- }
+ public Class<?> getMapOutputKeyClass();
/**
* Get the value class for the map output data. If it is not set, use the
@@ -120,9 +275,7 @@
*
* @return the map output value class.
*/
- public Class<?> getMapOutputValueClass() {
- return conf.getMapOutputValueClass();
- }
+ public Class<?> getMapOutputValueClass();
/**
* Get the user-specified job name. This is only used to identify the
@@ -130,98 +283,68 @@
*
* @return the job's name, defaulting to "".
*/
- public String getJobName() {
- return conf.getJobName();
- }
+ public String getJobName();
/**
* Get the {@link InputFormat} class for the job.
*
* @return the {@link InputFormat} class for the job.
*/
- @SuppressWarnings("unchecked")
public Class<? extends InputFormat<?,?>> getInputFormatClass()
- throws ClassNotFoundException {
- return (Class<? extends InputFormat<?,?>>)
- conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
- }
+ throws ClassNotFoundException;
/**
* Get the {@link Mapper} class for the job.
*
* @return the {@link Mapper} class for the job.
*/
- @SuppressWarnings("unchecked")
public Class<? extends Mapper<?,?,?,?>> getMapperClass()
- throws ClassNotFoundException {
- return (Class<? extends Mapper<?,?,?,?>>)
- conf.getClass(MAP_CLASS_ATTR, Mapper.class);
- }
+ throws ClassNotFoundException;
/**
* Get the combiner class for the job.
*
* @return the combiner class for the job.
*/
- @SuppressWarnings("unchecked")
public Class<? extends Reducer<?,?,?,?>> getCombinerClass()
- throws ClassNotFoundException {
- return (Class<? extends Reducer<?,?,?,?>>)
- conf.getClass(COMBINE_CLASS_ATTR, null);
- }
+ throws ClassNotFoundException;
/**
* Get the {@link Reducer} class for the job.
*
* @return the {@link Reducer} class for the job.
*/
- @SuppressWarnings("unchecked")
public Class<? extends Reducer<?,?,?,?>> getReducerClass()
- throws ClassNotFoundException {
- return (Class<? extends Reducer<?,?,?,?>>)
- conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
- }
+ throws ClassNotFoundException;
/**
* Get the {@link OutputFormat} class for the job.
*
* @return the {@link OutputFormat} class for the job.
*/
- @SuppressWarnings("unchecked")
public Class<? extends OutputFormat<?,?>> getOutputFormatClass()
- throws ClassNotFoundException {
- return (Class<? extends OutputFormat<?,?>>)
- conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
- }
+ throws ClassNotFoundException;
/**
* Get the {@link Partitioner} class for the job.
*
* @return the {@link Partitioner} class for the job.
*/
- @SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass()
- throws ClassNotFoundException {
- return (Class<? extends Partitioner<?,?>>)
- conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
- }
+ throws ClassNotFoundException;
/**
* Get the {@link RawComparator} comparator used to compare keys.
*
* @return the {@link RawComparator} comparator used to compare keys.
*/
- public RawComparator<?> getSortComparator() {
- return conf.getOutputKeyComparator();
- }
+ public RawComparator<?> getSortComparator();
/**
* Get the pathname of the job's jar.
* @return the pathname
*/
- public String getJar() {
- return conf.getJar();
- }
+ public String getJar();
/**
* Get the user defined {@link RawComparator} comparator for
@@ -230,18 +353,123 @@
* @return comparator set by the user for grouping values.
* @see Job#setGroupingComparatorClass(Class) for details.
*/
- public RawComparator<?> getGroupingComparator() {
- return conf.getOutputValueGroupingComparator();
- }
+ public RawComparator<?> getGroupingComparator();
/**
* Get whether job-setup and job-cleanup is needed for the job
*
* @return boolean
*/
- public boolean getJobSetupCleanupNeeded() {
- return conf.getBoolean("mapred.committer.job.setup.cleanup.needed", true);
- }
+ public boolean getJobSetupCleanupNeeded();
+
+ /**
+ * Get whether the task profiling is enabled.
+ * @return true if some tasks will be profiled
+ */
+ public boolean getProfileEnabled();
+
+ /**
+ * Get the profiler configuration arguments.
+ *
+ * The default value for this property is
+ * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
+ *
+ * @return the parameters to pass to the task child to configure profiling
+ */
+ public String getProfileParams();
+
+ /**
+ * Get the range of maps or reduces to profile.
+ * @param isMap is the task a map?
+ * @return the task ranges
+ */
+ public IntegerRanges getProfileTaskRange(boolean isMap);
+
+ /**
+ * Get the reported username for this job.
+ *
+ * @return the username
+ */
+ public String getUser();
+
+ /**
+ * This method checks to see if symlinks are to be create for the
+ * localized cache files in the current working directory
+ * @return true if symlinks are to be created- else return false
+ */
+ public boolean getSymlink();
+
+ /**
+ * Get the archive entries in classpath as an array of Path
+ */
+ public Path[] getArchiveClassPaths();
+
+ /**
+ * Get cache archives set in the Configuration
+ * @return A URI array of the caches set in the Configuration
+ * @throws IOException
+ */
+ public URI[] getCacheArchives() throws IOException;
+ /**
+ * Get cache files set in the Configuration
+ * @return A URI array of the files set in the Configuration
+ * @throws IOException
+ */
+
+ public URI[] getCacheFiles() throws IOException;
+
+ /**
+ * Return the path array of the localized caches
+ * @return A path array of localized caches
+ * @throws IOException
+ */
+ public Path[] getLocalCacheArchives() throws IOException;
+
+ /**
+ * Return the path array of the localized files
+ * @return A path array of localized files
+ * @throws IOException
+ */
+ public Path[] getLocalCacheFiles() throws IOException;
+
+ /**
+ * Get the file entries in classpath as an array of Path
+ */
+ public Path[] getFileClassPaths();
+
+ /**
+ * Get the timestamps of the archives. Used by internal
+ * DistributedCache and MapReduce code.
+ * @return a string array of timestamps
+ * @throws IOException
+ */
+ public String[] getArchiveTimestamps();
+
+ /**
+ * Get the timestamps of the files. Used by internal
+ * DistributedCache and MapReduce code.
+ * @return a string array of timestamps
+ * @throws IOException
+ */
+ public String[] getFileTimestamps();
+
+ /**
+ * Get the configured number of maximum attempts that will be made to run a
+ * map task, as specified by the <code>mapred.map.max.attempts</code>
+ * property. If this property is not already set, the default is 4 attempts.
+ *
+ * @return the max number of attempts per map task.
+ */
+ public int getMaxMapAttempts();
+
+ /**
+ * Get the configured number of maximum attempts that will be made to run a
+ * reduce task, as specified by the <code>mapred.reduce.max.attempts</code>
+ * property. If this property is not already set, the default is 4 attempts.
+ *
+ * @return the max number of attempts per reduce task.
+ */
+ public int getMaxReduceAttempts();
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/MapContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/MapContext.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/MapContext.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/MapContext.java Sat Nov 28 20:26:01 2009
@@ -18,9 +18,8 @@
package org.apache.hadoop.mapreduce;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
/**
* The context that is given to the {@link Mapper}.
@@ -29,43 +28,15 @@
* @param <KEYOUT> the key output type from the Mapper
* @param <VALUEOUT> the value output type from the Mapper
*/
-public class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
- private RecordReader<KEYIN,VALUEIN> reader;
- private InputSplit split;
-
- public MapContext(Configuration conf, TaskAttemptID taskid,
- RecordReader<KEYIN,VALUEIN> reader,
- RecordWriter<KEYOUT,VALUEOUT> writer,
- OutputCommitter committer,
- StatusReporter reporter,
- InputSplit split) {
- super(conf, taskid, writer, committer, reporter);
- this.reader = reader;
- this.split = split;
- }
/**
* Get the input split for this map.
*/
- public InputSplit getInputSplit() {
- return split;
- }
-
- @Override
- public KEYIN getCurrentKey() throws IOException, InterruptedException {
- return reader.getCurrentKey();
- }
-
- @Override
- public VALUEIN getCurrentValue() throws IOException, InterruptedException {
- return reader.getCurrentValue();
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- return reader.nextKeyValue();
- }
-
+ public InputSplit getInputSplit();
+
}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Mapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Mapper.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Mapper.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Mapper.java Sat Nov 28 20:26:01 2009
@@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
/**
* Maps input key/value pairs to a set of intermediate key/value pairs.
@@ -94,16 +95,11 @@
*/
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
- public class Context
- extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
- public Context(Configuration conf, TaskAttemptID taskid,
- RecordReader<KEYIN,VALUEIN> reader,
- RecordWriter<KEYOUT,VALUEOUT> writer,
- OutputCommitter committer,
- StatusReporter reporter,
- InputSplit split) throws IOException, InterruptedException {
- super(conf, taskid, reader, writer, committer, reporter, split);
- }
+ /**
+ * The <code>Context</code> passed on to the {@link Mapper} implementations.
+ */
+ public abstract class Context
+ implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
/**
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/OutputCommitter.java Sat Nov 28 20:26:01 2009
@@ -19,7 +19,6 @@
package org.apache.hadoop.mapreduce;
import java.io.IOException;
-
/**
* <code>OutputCommitter</code> describes the commit of task output for a
* Map-Reduce job.
@@ -69,9 +68,38 @@
*
* @param jobContext Context of the job whose output is being written.
* @throws IOException
+ * @deprecated Use {@link #commitJob(JobContext)} or
+ * {@link #abortJob(JobContext, JobStatus.State)} instead.
+ */
+ @Deprecated
+ public void cleanupJob(JobContext jobContext) throws IOException { }
+
+ /**
+ * For committing job's output after successful job completion. Note that this
+ * is invoked for jobs with final runstate as SUCCESSFUL.
+ *
+ * @param jobContext Context of the job whose output is being written.
+ * @throws IOException
*/
- public abstract void cleanupJob(JobContext jobContext) throws IOException;
+ public void commitJob(JobContext jobContext) throws IOException {
+ cleanupJob(jobContext);
+ }
+
+ /**
+ * For aborting an unsuccessful job's output. Note that this is invoked for
+ * jobs with final runstate as {@link JobStatus.State#FAILED} or
+ * {@link JobStatus.State#KILLED}.
+ *
+ * @param jobContext Context of the job whose output is being written.
+ * @param state final runstate of the job
+ * @throws IOException
+ */
+ public void abortJob(JobContext jobContext, JobStatus.State state)
+ throws IOException {
+ cleanupJob(jobContext);
+ }
+
/**
* Sets up output for the task.
*
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/ReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/ReduceContext.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/ReduceContext.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/ReduceContext.java Sat Nov 28 20:26:01 2009
@@ -18,22 +18,11 @@
package org.apache.hadoop.mapreduce;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
-import java.util.NoSuchElementException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.mapred.BackupStore;
-import org.apache.hadoop.mapred.RawKeyValueIterator;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
/**
* The context passed to the {@link Reducer}.
@@ -42,321 +31,32 @@
* @param <KEYOUT> the class of the output keys
* @param <VALUEOUT> the class of the output values
*/
-public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
- private RawKeyValueIterator input;
- private Counter inputCounter;
- private RawComparator<KEYIN> comparator;
- private KEYIN key; // current key
- private VALUEIN value; // current value
- private boolean firstValue = false; // first value in key
- private boolean nextKeyIsSame = false; // more w/ this key
- private boolean hasMore; // more in file
- protected Progressable reporter;
- private Deserializer<KEYIN> keyDeserializer;
- private Deserializer<VALUEIN> valueDeserializer;
- private DataInputBuffer buffer = new DataInputBuffer();
- private BytesWritable currentRawKey = new BytesWritable();
- private ValueIterable iterable = new ValueIterable();
- private boolean isMarked = false;
- private BackupStore<KEYIN,VALUEIN> backupStore;
- private final SerializationFactory serializationFactory;
- private final Class<KEYIN> keyClass;
- private final Class<VALUEIN> valueClass;
- private final Configuration conf;
- private final TaskAttemptID taskid;
- private int currentKeyLength = -1;
- private int currentValueLength = -1;
-
- public ReduceContext(Configuration conf, TaskAttemptID taskid,
- RawKeyValueIterator input,
- Counter inputCounter,
- RecordWriter<KEYOUT,VALUEOUT> output,
- OutputCommitter committer,
- StatusReporter reporter,
- RawComparator<KEYIN> comparator,
- Class<KEYIN> keyClass,
- Class<VALUEIN> valueClass
- ) throws InterruptedException, IOException{
- super(conf, taskid, output, committer, reporter);
- this.input = input;
- this.inputCounter = inputCounter;
- this.comparator = comparator;
- this.serializationFactory = new SerializationFactory(conf);
- this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
- this.keyDeserializer.open(buffer);
- this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
- this.valueDeserializer.open(buffer);
- hasMore = input.next();
- this.keyClass = keyClass;
- this.valueClass = valueClass;
- this.conf = conf;
- this.taskid = taskid;
- }
- public RawKeyValueIterator getIterator() {
- return input;
- }
-
- public Counter getInputCounter() {
- return inputCounter;
- }
-
- public RawComparator<KEYIN> getComparator() {
- return comparator;
- }
-
- public boolean hasMore() {
- return hasMore;
- }
-
/** Start processing next unique key. */
- public boolean nextKey() throws IOException,InterruptedException {
- while (hasMore && nextKeyIsSame) {
- nextKeyValue();
- }
- if (hasMore) {
- return nextKeyValue();
- } else {
- return false;
- }
- }
+ public boolean nextKey() throws IOException,InterruptedException;
/**
- * Advance to the next key/value pair.
+ * Iterate through the values for the current key, reusing the same value
+ * object, which is stored in the context.
+ * @return the series of values associated with the current key. All of the
+ * objects returned directly and indirectly from this method are reused.
*/
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (!hasMore) {
- key = null;
- value = null;
- return false;
- }
- firstValue = !nextKeyIsSame;
- DataInputBuffer nextKey = input.getKey();
- currentRawKey.set(nextKey.getData(), nextKey.getPosition(),
- nextKey.getLength() - nextKey.getPosition());
- buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
- key = keyDeserializer.deserialize(key);
- DataInputBuffer nextVal = input.getValue();
- buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength());
- value = valueDeserializer.deserialize(value);
-
- currentKeyLength = nextKey.getLength() - nextKey.getPosition();
- currentValueLength = nextVal.getLength() - nextVal.getPosition();
-
- if (isMarked) {
- backupStore.write(nextKey, nextVal);
- }
-
- hasMore = input.next();
- inputCounter.increment(1);
- if (hasMore) {
- nextKey = input.getKey();
- nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
- currentRawKey.getLength(),
- nextKey.getData(),
- nextKey.getPosition(),
- nextKey.getLength() - nextKey.getPosition()
- ) == 0;
- } else {
- nextKeyIsSame = false;
- }
- return true;
- }
-
- public KEYIN getCurrentKey() {
- return key;
- }
-
- @Override
- public VALUEIN getCurrentValue() {
- return value;
- }
-
-
-
- protected class ValueIterator implements MarkableIteratorInterface<VALUEIN> {
-
- private boolean inReset = false;
- private boolean clearMarkFlag = false;
-
- @Override
- public boolean hasNext() {
- try {
- if (inReset && backupStore.hasNext()) {
- return true;
- }
- } catch (Exception e) {
- e.printStackTrace();
- throw new RuntimeException("hasNext failed", e);
- }
- return firstValue || nextKeyIsSame;
- }
+ public Iterable<VALUEIN> getValues() throws IOException, InterruptedException;
- @Override
- public VALUEIN next() {
- if (inReset) {
- try {
- if (backupStore.hasNext()) {
- backupStore.next();
- DataInputBuffer next = backupStore.nextValue();
- buffer.reset(next.getData(), next.getPosition(), next.getLength());
- value = valueDeserializer.deserialize(value);
- return value;
- } else {
- inReset = false;
- backupStore.exitResetMode();
- if (clearMarkFlag) {
- clearMarkFlag = false;
- isMarked = false;
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- throw new RuntimeException("next value iterator failed", e);
- }
- }
-
- // if this is the first record, we don't need to advance
- if (firstValue) {
- firstValue = false;
- return value;
- }
- // if this isn't the first record and the next key is different, they
- // can't advance it here.
- if (!nextKeyIsSame) {
- throw new NoSuchElementException("iterate past last value");
- }
- // otherwise, go to the next key/value pair
- try {
- nextKeyValue();
- return value;
- } catch (IOException ie) {
- throw new RuntimeException("next value iterator failed", ie);
- } catch (InterruptedException ie) {
- // this is bad, but we can't modify the exception list of java.util
- throw new RuntimeException("next value iterator interrupted", ie);
- }
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("remove not implemented");
- }
-
- @Override
- public void mark() throws IOException {
- if (backupStore == null) {
- backupStore = new BackupStore<KEYIN,VALUEIN>(conf, taskid);
- }
- isMarked = true;
- if (!inReset) {
- backupStore.reinitialize();
- if (currentKeyLength == -1) {
- // The user has not called next() for this iterator yet, so
- // there is no current record to mark and copy to backup store.
- return;
- }
- assert (currentValueLength != -1);
- int requestedSize = currentKeyLength + currentValueLength +
- WritableUtils.getVIntSize(currentKeyLength) +
- WritableUtils.getVIntSize(currentValueLength);
- DataOutputStream out = backupStore.getOutputStream(requestedSize);
- writeFirstKeyValueBytes(out);
- backupStore.updateCounters(requestedSize);
- } else {
- backupStore.mark();
- }
- }
-
- @Override
- public void reset() throws IOException {
- // We reached the end of an iteration and user calls a
- // reset, but a clearMark was called before, just throw
- // an exception
- if (clearMarkFlag) {
- clearMarkFlag = false;
- backupStore.clearMark();
- throw new IOException("Reset called without a previous mark");
- }
-
- if (!isMarked) {
- throw new IOException("Reset called without a previous mark");
- }
- inReset = true;
- backupStore.reset();
- }
+ /**
+ * {@link Iterator} to iterate over values for a given group of records.
+ */
+ interface ValueIterator<VALUEIN> extends MarkableIteratorInterface<VALUEIN> {
- @Override
- public void clearMark() throws IOException {
- if (backupStore == null) {
- return;
- }
- if (inReset) {
- clearMarkFlag = true;
- backupStore.clearMark();
- } else {
- inReset = isMarked = false;
- backupStore.reinitialize();
- }
- }
-
/**
* This method is called when the reducer moves from one key to
* another.
* @throws IOException
*/
- void resetBackupStore() throws IOException {
- if (backupStore == null) {
- return;
- }
- inReset = isMarked = false;
- backupStore.reinitialize();
- currentKeyLength = -1;
- }
-
- /**
- * This method is called to write the record that was most recently
- * served (before a call to the mark). Since the framework reads one
- * record in advance, to get this record, we serialize the current key
- * and value
- * @param out
- * @throws IOException
- */
- private void writeFirstKeyValueBytes(DataOutputStream out)
- throws IOException {
- assert (getCurrentKey() != null && getCurrentValue() != null);
- WritableUtils.writeVInt(out, currentKeyLength);
- WritableUtils.writeVInt(out, currentValueLength);
- Serializer<KEYIN> keySerializer =
- serializationFactory.getSerializer(keyClass);
- keySerializer.open(out);
- keySerializer.serialize(getCurrentKey());
-
- Serializer<VALUEIN> valueSerializer =
- serializationFactory.getSerializer(valueClass);
- valueSerializer.open(out);
- valueSerializer.serialize(getCurrentValue());
- }
- }
-
- protected class ValueIterable implements Iterable<VALUEIN> {
- private ValueIterator iterator = new ValueIterator();
- @Override
- public Iterator<VALUEIN> iterator() {
- return iterator;
- }
- }
-
- /**
- * Iterate through the values for the current key, reusing the same value
- * object, which is stored in the context.
- * @return the series of values associated with the current key. All of the
- * objects returned directly and indirectly from this method are reused.
- */
- public
- Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
- return iterable;
+ void resetBackupStore() throws IOException;
}
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Reducer.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Reducer.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/Reducer.java Sat Nov 28 20:26:01 2009
@@ -117,21 +117,11 @@
*/
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
- public class Context
- extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
- public Context(Configuration conf, TaskAttemptID taskid,
- RawKeyValueIterator input,
- Counter inputCounter,
- RecordWriter<KEYOUT,VALUEOUT> output,
- OutputCommitter committer,
- StatusReporter reporter,
- RawComparator<KEYIN> comparator,
- Class<KEYIN> keyClass,
- Class<VALUEIN> valueClass
- ) throws IOException, InterruptedException {
- super(conf, taskid, input, inputCounter, output, committer, reporter,
- comparator, keyClass, valueClass);
- }
+ /**
+ * The <code>Context</code> passed on to the {@link Reducer} implementations.
+ */
+ public abstract class Context
+ implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
/**
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java Sat Nov 28 20:26:01 2009
@@ -18,49 +18,31 @@
package org.apache.hadoop.mapreduce;
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.Progressable;
/**
* The context for task attempts.
*/
-public class TaskAttemptContext extends JobContext implements Progressable {
- private final TaskAttemptID taskId;
- private String status = "";
-
- public TaskAttemptContext(Configuration conf,
- TaskAttemptID taskId) {
- super(conf, taskId.getJobID());
- this.taskId = taskId;
- }
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface TaskAttemptContext extends JobContext, Progressable {
/**
* Get the unique name for this task attempt.
*/
- public TaskAttemptID getTaskAttemptID() {
- return taskId;
- }
+ public TaskAttemptID getTaskAttemptID();
/**
* Set the current status of the task to the given string.
*/
- public void setStatus(String msg) throws IOException {
- status = msg;
- }
+ public void setStatus(String msg);
/**
* Get the last set status message.
* @return the current status message
*/
- public String getStatus() {
- return status;
- }
+ public String getStatus();
- /**
- * Report progress. The subtypes actually do work in this method.
- */
- public void progress() {
- }
}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/TaskInputOutputContext.java Sat Nov 28 20:26:01 2009
@@ -20,8 +20,8 @@
import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
/**
* A context object that allows input and output from the task. It is only
@@ -31,28 +31,16 @@
* @param <KEYOUT> the output key type for the task
* @param <VALUEOUT> the output value type for the task
*/
-public abstract class TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
- extends TaskAttemptContext implements Progressable {
- private RecordWriter<KEYOUT,VALUEOUT> output;
- private StatusReporter reporter;
- private OutputCommitter committer;
-
- public TaskInputOutputContext(Configuration conf, TaskAttemptID taskid,
- RecordWriter<KEYOUT,VALUEOUT> output,
- OutputCommitter committer,
- StatusReporter reporter) {
- super(conf, taskid);
- this.output = output;
- this.reporter = reporter;
- this.committer = committer;
- }
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+ extends TaskAttemptContext {
/**
* Advance to the next key, value pair, returning null if at end.
* @return the key object that was read into, or null if no more
*/
- public abstract
- boolean nextKeyValue() throws IOException, InterruptedException;
+ public boolean nextKeyValue() throws IOException, InterruptedException;
/**
* Get the current key.
@@ -60,8 +48,7 @@
* @throws IOException
* @throws InterruptedException
*/
- public abstract
- KEYIN getCurrentKey() throws IOException, InterruptedException;
+ public KEYIN getCurrentKey() throws IOException, InterruptedException;
/**
* Get the current value.
@@ -69,36 +56,33 @@
* @throws IOException
* @throws InterruptedException
*/
- public abstract VALUEIN getCurrentValue() throws IOException,
- InterruptedException;
+ public VALUEIN getCurrentValue() throws IOException, InterruptedException;
/**
* Generate an output key/value pair.
*/
- public void write(KEYOUT key, VALUEOUT value
- ) throws IOException, InterruptedException {
- output.write(key, value);
- }
-
- public Counter getCounter(Enum<?> counterName) {
- return reporter.getCounter(counterName);
- }
-
- public Counter getCounter(String groupName, String counterName) {
- return reporter.getCounter(groupName, counterName);
- }
-
- @Override
- public void progress() {
- reporter.progress();
- }
-
- @Override
- public void setStatus(String status) {
- reporter.setStatus(status);
- }
-
- public OutputCommitter getOutputCommitter() {
- return committer;
- }
+ public void write(KEYOUT key, VALUEOUT value)
+ throws IOException, InterruptedException;
+
+ /**
+ * Get the {@link Counter} for the given <code>counterName</code>.
+ * @param counterName counter name
+ * @return the <code>Counter</code> for the given <code>counterName</code>
+ */
+ public Counter getCounter(Enum<?> counterName);
+
+ /**
+ * Get the {@link Counter} for the given <code>groupName</code> and
+ * <code>counterName</code>.
+ * @param counterName counter name
+ * @return the <code>Counter</code> for the given <code>groupName</code> and
+ * <code>counterName</code>
+ */
+ public Counter getCounter(String groupName, String counterName);
+
+ /**
+ * Get the {@link OutputCommitter} for the task-attempt.
+ * @return the <code>OutputCommitter</code> for the task-attempt
+ */
+ public OutputCommitter getOutputCommitter();
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Sat Nov 28 20:26:01 2009
@@ -24,6 +24,8 @@
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
import java.net.URI;
@@ -125,6 +127,7 @@
* @see org.apache.hadoop.mapred.JobConf
* @see org.apache.hadoop.mapred.JobClient
*/
+@Deprecated
public class DistributedCache {
/**
* Get the locally cached file or archive; it could either be
@@ -148,9 +151,10 @@
* @return the path to directory where the archives are unjarred in case of archives,
* the path to the file where the file is copied locally
* @throws IOException
- * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
- * instead.
+ * @deprecated Internal to MapReduce framework.
+ * Use TrackerDistributedCacheManager instead.
*/
+ @Deprecated
public static Path getLocalCache(URI cache, Configuration conf,
Path baseDir, FileStatus fileStatus,
boolean isArchive, long confFileStamp,
@@ -185,16 +189,17 @@
* @return the path to directory where the archives are unjarred in case of archives,
* the path to the file where the file is copied locally
* @throws IOException
- * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
- * instead.
+ * @deprecated Internal to MapReduce framework.
+ * Use TrackerDistributedCacheManager instead.
*/
+ @Deprecated
public static Path getLocalCache(URI cache, Configuration conf,
Path baseDir, FileStatus fileStatus,
boolean isArchive, long confFileStamp,
Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
return new TrackerDistributedCacheManager(conf).getLocalCache(cache, conf,
- baseDir, fileStatus, isArchive, confFileStamp, currentWorkDir,
+ baseDir.toString(), fileStatus, isArchive, confFileStamp, currentWorkDir,
honorSymLinkConf);
}
@@ -219,9 +224,10 @@
* @return the path to directory where the archives are unjarred in case of archives,
* the path to the file where the file is copied locally
* @throws IOException
- * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
- * instead.
+ * @deprecated Internal to MapReduce framework.
+ * Use TrackerDistributedCacheManager instead.
*/
+ @Deprecated
public static Path getLocalCache(URI cache, Configuration conf,
Path baseDir, boolean isArchive,
long confFileStamp, Path currentWorkDir)
@@ -238,15 +244,44 @@
* @param conf configuration which contains the filesystem the cache
* is contained in.
* @throws IOException
- * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
- * instead.
+ * @deprecated Internal to MapReduce framework.
+ * Use TrackerDistributedCacheManager instead.
*/
+ @Deprecated
public static void releaseCache(URI cache, Configuration conf)
throws IOException {
- new TrackerDistributedCacheManager(conf).releaseCache(cache, conf);
+ // find the timestamp of the uri
+ URI[] archives = DistributedCache.getCacheArchives(conf);
+ URI[] files = DistributedCache.getCacheFiles(conf);
+ String[] archivesTimestamps =
+ DistributedCache.getArchiveTimestamps(conf);
+ String[] filesTimestamps =
+ DistributedCache.getFileTimestamps(conf);
+ String timestamp = null;
+ if (archives != null) {
+ for (int i = 0; i < archives.length; i++) {
+ if (archives[i].equals(cache)) {
+ timestamp = archivesTimestamps[i];
+ break;
+ }
+ }
+ }
+ if (timestamp == null && files != null) {
+ for (int i = 0; i < files.length; i++) {
+ if (files[i].equals(cache)) {
+ timestamp = filesTimestamps[i];
+ break;
+ }
+ }
+ }
+ if (timestamp == null) {
+ throw new IOException("TimeStamp of the uri couldnot be found");
+ }
+ new TrackerDistributedCacheManager(conf).releaseCache(cache, conf,
+ Long.parseLong(timestamp));
}
- /*
+ /**
* Returns the relative path of the dir this cache will be localized in
* relative path that this cache will be localized in. For
* hdfs://hostname:port/absolute_path -- the relative path is
@@ -256,6 +291,7 @@
* @deprecated Internal to MapReduce framework. Use DistributedCacheManager
* instead.
*/
+ @Deprecated
public static String makeRelative(URI cache, Configuration conf)
throws IOException {
return new TrackerDistributedCacheManager(conf).makeRelative(cache, conf);
@@ -268,13 +304,13 @@
* @param cache cache file
* @return mtime of a given cache file on hdfs
* @throws IOException
+ * @deprecated Internal to MapReduce framework.
+ * Use {@link TrackerDistributedCacheManager} instead.
*/
+ @Deprecated
public static long getTimestamp(Configuration conf, URI cache)
throws IOException {
- FileSystem fileSystem = FileSystem.get(cache, conf);
- Path filePath = new Path(cache.getPath());
-
- return fileSystem.getFileStatus(filePath).getModificationTime();
+ return TrackerDistributedCacheManager.getTimestamp(conf, cache);
}
/**
@@ -286,6 +322,7 @@
* @deprecated Internal to MapReduce framework. Use DistributedCacheManager
* instead.
*/
+ @Deprecated
public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
throws IOException{
TrackerDistributedCacheManager.createAllSymlink(conf, jobCacheDir, workDir);
@@ -296,10 +333,12 @@
* to be used by user code.
* @param archives The list of archives that need to be localized
* @param conf Configuration which will be changed
+ * @deprecated Use {@link Job#setCacheArchives(URI[])} instead
*/
+ @Deprecated
public static void setCacheArchives(URI[] archives, Configuration conf) {
String sarchives = StringUtils.uriToString(archives);
- conf.set("mapred.cache.archives", sarchives);
+ conf.set(JobContext.CACHE_ARCHIVES, sarchives);
}
/**
@@ -307,10 +346,12 @@
* used by user code.
* @param files The list of files that need to be localized
* @param conf Configuration which will be changed
+ * @deprecated Use {@link Job#setCacheFiles(URI[])} instead
*/
+ @Deprecated
public static void setCacheFiles(URI[] files, Configuration conf) {
String sfiles = StringUtils.uriToString(files);
- conf.set("mapred.cache.files", sfiles);
+ conf.set(JobContext.CACHE_FILES, sfiles);
}
/**
@@ -319,9 +360,11 @@
* @param conf The configuration which contains the archives
* @return A URI array of the caches set in the Configuration
* @throws IOException
+ * @deprecated Use {@link JobContext#getCacheArchives()} instead
*/
+ @Deprecated
public static URI[] getCacheArchives(Configuration conf) throws IOException {
- return StringUtils.stringToURI(conf.getStrings("mapred.cache.archives"));
+ return StringUtils.stringToURI(conf.getStrings(JobContext.CACHE_ARCHIVES));
}
/**
@@ -330,9 +373,11 @@
* @param conf The configuration which contains the files
* @return A URI array of the files set in the Configuration
* @throws IOException
+ * @deprecated Use {@link JobContext#getCacheFiles()} instead
*/
+ @Deprecated
public static URI[] getCacheFiles(Configuration conf) throws IOException {
- return StringUtils.stringToURI(conf.getStrings("mapred.cache.files"));
+ return StringUtils.stringToURI(conf.getStrings(JobContext.CACHE_FILES));
}
/**
@@ -341,11 +386,13 @@
* @param conf Configuration that contains the localized archives
* @return A path array of localized caches
* @throws IOException
+ * @deprecated Use {@link JobContext#getLocalCacheArchives()} instead
*/
+ @Deprecated
public static Path[] getLocalCacheArchives(Configuration conf)
throws IOException {
return StringUtils.stringToPath(conf
- .getStrings("mapred.cache.localArchives"));
+ .getStrings(JobContext.CACHE_LOCALARCHIVES));
}
/**
@@ -354,10 +401,12 @@
* @param conf Configuration that contains the localized files
* @return A path array of localized files
* @throws IOException
+ * @deprecated Use {@link JobContext#getLocalCacheFiles()} instead
*/
+ @Deprecated
public static Path[] getLocalCacheFiles(Configuration conf)
throws IOException {
- return StringUtils.stringToPath(conf.getStrings("mapred.cache.localFiles"));
+ return StringUtils.stringToPath(conf.getStrings(JobContext.CACHE_LOCALFILES));
}
/**
@@ -366,9 +415,11 @@
* @param conf The configuration which stored the timestamps
* @return a string array of timestamps
* @throws IOException
+ * @deprecated Use {@link JobContext#getArchiveTimestamps()} instead
*/
+ @Deprecated
public static String[] getArchiveTimestamps(Configuration conf) {
- return conf.getStrings("mapred.cache.archives.timestamps");
+ return conf.getStrings(JobContext.CACHE_ARCHIVES_TIMESTAMPS);
}
@@ -378,9 +429,11 @@
* @param conf The configuration which stored the timestamps
* @return a string array of timestamps
* @throws IOException
+ * @deprecated Use {@link JobContext#getFileTimestamps()} instead
*/
+ @Deprecated
public static String[] getFileTimestamps(Configuration conf) {
- return conf.getStrings("mapred.cache.files.timestamps");
+ return conf.getStrings(JobContext.CACHE_FILE_TIMESTAMPS);
}
/**
@@ -389,9 +442,13 @@
* @param conf Configuration which stores the timestamp's
* @param timestamps comma separated list of timestamps of archives.
* The order should be the same as the order in which the archives are added.
+ * @deprecated Use
+ * {@link TrackerDistributedCacheManager#setArchiveTimestamps(Configuration, String)}
+ * instead
*/
+ @Deprecated
public static void setArchiveTimestamps(Configuration conf, String timestamps) {
- conf.set("mapred.cache.archives.timestamps", timestamps);
+ TrackerDistributedCacheManager.setArchiveTimestamps(conf, timestamps);
}
/**
@@ -400,9 +457,13 @@
* @param conf Configuration which stores the timestamp's
* @param timestamps comma separated list of timestamps of files.
* The order should be the same as the order in which the files are added.
+ * @deprecated Use
+ * {@link TrackerDistributedCacheManager#setFileTimestamps(Configuration, String)}
+ * instead
*/
+ @Deprecated
public static void setFileTimestamps(Configuration conf, String timestamps) {
- conf.set("mapred.cache.files.timestamps", timestamps);
+ TrackerDistributedCacheManager.setFileTimestamps(conf, timestamps);
}
/**
@@ -410,9 +471,13 @@
* by internal DistributedCache code.
* @param conf The conf to modify to contain the localized caches
* @param str a comma separated list of local archives
+ * @deprecated Use
+ * {@link TrackerDistributedCacheManager#setLocalArchives(Configuration, String)}
+ * instead
*/
+ @Deprecated
public static void setLocalArchives(Configuration conf, String str) {
- conf.set("mapred.cache.localArchives", str);
+ TrackerDistributedCacheManager.setLocalArchives(conf, str);
}
/**
@@ -420,9 +485,13 @@
* by internal DistributedCache code.
* @param conf The conf to modify to contain the localized caches
* @param str a comma separated list of local files
+ * @deprecated Use
+ * {@link TrackerDistributedCacheManager#setLocalFiles(Configuration, String)}
+ * instead
*/
+ @Deprecated
public static void setLocalFiles(Configuration conf, String str) {
- conf.set("mapred.cache.localFiles", str);
+ TrackerDistributedCacheManager.setLocalFiles(conf, str);
}
/**
@@ -430,10 +499,12 @@
* be used by user code.
* @param uri The uri of the cache to be localized
* @param conf Configuration to add the cache to
+ * @deprecated Use {@link Job#addCacheArchive(URI)} instead
*/
+ @Deprecated
public static void addCacheArchive(URI uri, Configuration conf) {
- String archives = conf.get("mapred.cache.archives");
- conf.set("mapred.cache.archives", archives == null ? uri.toString()
+ String archives = conf.get(JobContext.CACHE_ARCHIVES);
+ conf.set(JobContext.CACHE_ARCHIVES, archives == null ? uri.toString()
: archives + "," + uri.toString());
}
@@ -442,10 +513,12 @@
* to be used by user code.
* @param uri The uri of the cache to be localized
* @param conf Configuration to add the cache to
+ * @deprecated Use {@link Job#addCacheFile(URI)} instead
*/
+ @Deprecated
public static void addCacheFile(URI uri, Configuration conf) {
- String files = conf.get("mapred.cache.files");
- conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","
+ String files = conf.get(JobContext.CACHE_FILES);
+ conf.set(JobContext.CACHE_FILES, files == null ? uri.toString() : files + ","
+ uri.toString());
}
@@ -455,11 +528,13 @@
*
* @param file Path of the file to be added
* @param conf Configuration that contains the classpath setting
+ * @deprecated Use {@link Job#addFileToClassPath(Path)} instead
*/
+ @Deprecated
public static void addFileToClassPath(Path file, Configuration conf)
throws IOException {
- String classpath = conf.get("mapred.job.classpath.files");
- conf.set("mapred.job.classpath.files", classpath == null ? file.toString()
+ String classpath = conf.get(JobContext.CLASSPATH_FILES);
+ conf.set(JobContext.CLASSPATH_FILES, classpath == null ? file.toString()
: classpath + "," + file.toString());
FileSystem fs = FileSystem.get(conf);
URI uri = fs.makeQualified(file).toUri();
@@ -472,10 +547,12 @@
* Used by internal DistributedCache code.
*
* @param conf Configuration that contains the classpath setting
+ * @deprecated Use {@link JobContext#getFileClassPaths()} instead
*/
+ @Deprecated
public static Path[] getFileClassPaths(Configuration conf) {
ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
- "mapred.job.classpath.files");
+ JobContext.CLASSPATH_FILES);
if (list.size() == 0) {
return null;
}
@@ -492,11 +569,13 @@
*
* @param archive Path of the archive to be added
* @param conf Configuration that contains the classpath setting
+ * @deprecated Use {@link Job#addArchiveToClassPath(Path)} instead
*/
+ @Deprecated
public static void addArchiveToClassPath(Path archive, Configuration conf)
throws IOException {
- String classpath = conf.get("mapred.job.classpath.archives");
- conf.set("mapred.job.classpath.archives", classpath == null ? archive
+ String classpath = conf.get(JobContext.CLASSPATH_ARCHIVES);
+ conf.set(JobContext.CLASSPATH_ARCHIVES, classpath == null ? archive
.toString() : classpath + "," + archive.toString());
FileSystem fs = FileSystem.get(conf);
URI uri = fs.makeQualified(archive).toUri();
@@ -509,10 +588,12 @@
* Used by internal DistributedCache code.
*
* @param conf Configuration that contains the classpath setting
+ * @deprecated Use {@link JobContext#getArchiveClassPaths()} instead
*/
+ @Deprecated
public static Path[] getArchiveClassPaths(Configuration conf) {
ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
- "mapred.job.classpath.archives");
+ JobContext.CLASSPATH_ARCHIVES);
if (list.size() == 0) {
return null;
}
@@ -527,10 +608,12 @@
* This method allows you to create symlinks in the current working directory
* of the task to all the cache files/archives.
* Intended to be used by user code.
- * @param conf the jobconf
+ * @param conf the jobconf
+ * @deprecated Use {@link Job#createSymlink()} instead
*/
+ @Deprecated
public static void createSymlink(Configuration conf){
- conf.set("mapred.create.symlink", "yes");
+ conf.set(JobContext.CACHE_SYMLINK, "yes");
}
/**
@@ -539,9 +622,11 @@
* Used by internal DistributedCache code.
* @param conf the jobconf
* @return true if symlinks are to be created- else return false
+ * @deprecated Use {@link JobContext#getSymlink()} instead
*/
+ @Deprecated
public static boolean getSymlink(Configuration conf){
- String result = conf.get("mapred.create.symlink");
+ String result = conf.get(JobContext.CACHE_SYMLINK);
if ("yes".equals(result)){
return true;
}
@@ -555,52 +640,22 @@
* the various archives and files. May be used by user code.
* @param uriFiles The uri array of urifiles
* @param uriArchives the uri array of uri archives
+ * @deprecated Use
+ * {@link TrackerDistributedCacheManager#checkURIs(URI[], URI[])} instead
*/
+ @Deprecated
public static boolean checkURIs(URI[] uriFiles, URI[] uriArchives){
- if ((uriFiles == null) && (uriArchives == null)){
- return true;
- }
- if (uriFiles != null){
- for (int i = 0; i < uriFiles.length; i++){
- String frag1 = uriFiles[i].getFragment();
- if (frag1 == null)
- return false;
- for (int j=i+1; j < uriFiles.length; j++){
- String frag2 = uriFiles[j].getFragment();
- if (frag2 == null)
- return false;
- if (frag1.equalsIgnoreCase(frag2))
- return false;
- }
- if (uriArchives != null){
- for (int j = 0; j < uriArchives.length; j++){
- String frag2 = uriArchives[j].getFragment();
- if (frag2 == null){
- return false;
- }
- if (frag1.equalsIgnoreCase(frag2))
- return false;
- for (int k=j+1; k < uriArchives.length; k++){
- String frag3 = uriArchives[k].getFragment();
- if (frag3 == null)
- return false;
- if (frag2.equalsIgnoreCase(frag3))
- return false;
- }
- }
- }
- }
- }
- return true;
+ return TrackerDistributedCacheManager.checkURIs(uriFiles, uriArchives);
}
/**
* Clear the entire contents of the cache and delete the backing files. This
* should only be used when the server is reinitializing, because the users
* are going to lose their files.
- * @deprecated Internal to MapReduce framework. Use DistributedCacheManager
- * instead.
+ * @deprecated Internal to MapReduce framework.
+ * Use TrackerDistributedCacheManager instead.
*/
+ @Deprecated
public static void purgeCache(Configuration conf) throws IOException {
new TrackerDistributedCacheManager(conf).purgeCache();
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Sat Nov 28 20:26:01 2009
@@ -30,10 +30,9 @@
import java.util.List;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
@@ -97,7 +96,7 @@
Map<String, Path> classPaths = new HashMap<String, Path>();
if (paths != null) {
for (Path p : paths) {
- classPaths.put(p.toString(), p);
+ classPaths.put(p.toUri().getPath().toString(), p);
}
}
for (int i = 0; i < uris.length; ++i) {
@@ -152,13 +151,9 @@
URI uri = cacheFile.uri;
FileSystem fileSystem = FileSystem.get(uri, taskConf);
FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
- String cacheId = this.distributedCacheManager.makeRelative(uri, taskConf);
- String cachePath = cacheSubdir + Path.SEPARATOR + cacheId;
- Path localPath = lDirAlloc.getLocalPathForWrite(cachePath,
- fileStatus.getLen(), taskConf);
- String baseDir = localPath.toString().replace(cacheId, "");
+
Path p = distributedCacheManager.getLocalCache(uri, taskConf,
- new Path(baseDir), fileStatus,
+ cacheSubdir, fileStatus,
cacheFile.type == CacheFile.FileType.ARCHIVE,
cacheFile.timestamp, workdirPath, false);
@@ -174,11 +169,12 @@
// Update the configuration object with localized data.
if (!localArchives.isEmpty()) {
- DistributedCache.setLocalArchives(taskConf,
+ TrackerDistributedCacheManager.setLocalArchives(taskConf,
stringifyPathList(localArchives));
}
if (!localFiles.isEmpty()) {
- DistributedCache.setLocalFiles(taskConf, stringifyPathList(localFiles));
+ TrackerDistributedCacheManager.setLocalFiles(taskConf,
+ stringifyPathList(localFiles));
}
}
@@ -214,7 +210,7 @@
*/
public void release() throws IOException {
for (CacheFile c : cacheFiles) {
- distributedCacheManager.releaseCache(c.uri, taskConf);
+ distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp);
}
}
@@ -222,7 +218,7 @@
* Creates a class loader that includes the designated
* files and archives.
*/
- public ClassLoader makeClassLoader(final ClassLoader parent)
+ public ClassLoader makeClassLoader(final ClassLoader parent)
throws MalformedURLException {
final URL[] urls = new URL[classPaths.size()];
for (int i = 0; i < classPaths.size(); ++i) {
@@ -232,7 +228,7 @@
@Override
public ClassLoader run() {
return new URLClassLoader(urls, parent);
- }
+ }
});
}
}