You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/12/15 23:21:35 UTC
svn commit: r726850 [3/4] - in /hadoop/core/trunk: ./
src/core/org/apache/hadoop/conf/ src/core/org/apache/hadoop/io/
src/core/org/apache/hadoop/util/ src/examples/org/apache/hadoop/examples/
src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/h...
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/CounterGroup.java Mon Dec 15 14:21:32 2008
@@ -18,10 +18,167 @@
package org.apache.hadoop.mapreduce;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.MissingResourceException;
+import java.util.ResourceBundle;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
/**
* A group of {@link Counter}s that logically belong together. Typically,
* it is an {@link Enum} subclass and the counters are the values.
*/
-public abstract class CounterGroup implements Iterable<Counter> {
- abstract public String getName();
+public class CounterGroup implements Writable, Iterable<Counter> {
+ private String name;
+ private String displayName;
+ private TreeMap<String, Counter> counters = new TreeMap<String, Counter>();
+ // Optional ResourceBundle for localization of group and counter names.
+ private ResourceBundle bundle = null;
+
+ /**
+ * Returns the specified resource bundle, or throws an exception.
+ * @throws MissingResourceException if the bundle isn't found
+ */
+ private static ResourceBundle getResourceBundle(String enumClassName) {
+ String bundleName = enumClassName.replace('$','_');
+ return ResourceBundle.getBundle(bundleName);
+ }
+
+ protected CounterGroup(String name) {
+ this.name = name;
+ try {
+ bundle = getResourceBundle(name);
+ }
+ catch (MissingResourceException neverMind) {
+ }
+ displayName = localize("CounterGroupName", name);
+ }
+
+ protected CounterGroup(String name, String displayName) {
+ this.name = name;
+ this.displayName = displayName;
+ }
+
+ /**
+ * Get the internal name of the group
+ * @return the internal name
+ */
+ public synchronized String getName() {
+ return name;
+ }
+
+ /**
+ * Get the display name of the group.
+ * @return the human readable name
+ */
+ public synchronized String getDisplayName() {
+ return displayName;
+ }
+
+ synchronized void addCounter(Counter counter) {
+ counters.put(counter.getName(), counter);
+ }
+
+ /**
+ * Internal to find a counter in a group.
+ * @param counterName the name of the counter
+ * @param displayName the display name of the counter
+ * @return the counter that was found or added
+ */
+ protected Counter findCounter(String counterName, String displayName) {
+ Counter result = counters.get(counterName);
+ if (result == null) {
+ result = new Counter(counterName, displayName);
+ counters.put(counterName, result);
+ }
+ return result;
+ }
+
+ public synchronized Counter findCounter(String counterName) {
+ Counter result = counters.get(counterName);
+ if (result == null) {
+ String displayName = localize(counterName, counterName);
+ result = new Counter(counterName, displayName);
+ counters.put(counterName, result);
+ }
+ return result;
+ }
+
+ public synchronized Iterator<Counter> iterator() {
+ return counters.values().iterator();
+ }
+
+ public synchronized void write(DataOutput out) throws IOException {
+ Text.writeString(out, displayName);
+ WritableUtils.writeVInt(out, counters.size());
+ for(Counter counter: counters.values()) {
+ counter.write(out);
+ }
+ }
+
+ public synchronized void readFields(DataInput in) throws IOException {
+ displayName = Text.readString(in);
+ counters.clear();
+ int size = WritableUtils.readVInt(in);
+ for(int i=0; i < size; i++) {
+ Counter counter = new Counter();
+ counter.readFields(in);
+ counters.put(counter.getName(), counter);
+ }
+ }
+
+ /**
+ * Looks up key in the ResourceBundle and returns the corresponding value.
+ * If the bundle or the key doesn't exist, returns the default value.
+ */
+ private String localize(String key, String defaultValue) {
+ String result = defaultValue;
+ if (bundle != null) {
+ try {
+ result = bundle.getString(key);
+ }
+ catch (MissingResourceException mre) {
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Returns the number of counters in this group.
+ */
+ public synchronized int size() {
+ return counters.size();
+ }
+
+ public synchronized boolean equals(Object genericRight) {
+ if (genericRight instanceof CounterGroup) {
+ Iterator<Counter> right = ((CounterGroup) genericRight).counters.
+ values().iterator();
+ Iterator<Counter> left = counters.values().iterator();
+ while (left.hasNext()) {
+ if (!right.hasNext() || !left.next().equals(right.next())) {
+ return false;
+ }
+ }
+ return !right.hasNext();
+ }
+ return false;
+ }
+
+ public synchronized int hashCode() {
+ return counters.hashCode();
+ }
+
+ public synchronized void incrAllCounters(CounterGroup rightGroup) {
+ for(Counter right: rightGroup.counters.values()) {
+ Counter left = findCounter(right.getName(), right.getDisplayName());
+ left.increment(right.getValue());
+ }
+ }
}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counters.java?rev=726850&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counters.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Counters.java Mon Dec 15 14:21:32 2008
@@ -0,0 +1,184 @@
+package org.apache.hadoop.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+public class Counters implements Writable,Iterable<CounterGroup> {
+ /**
+ * A cache from enum values to the associated counter. Dramatically speeds up
+ * typical usage.
+ */
+ private Map<Enum<?>, Counter> cache = new IdentityHashMap<Enum<?>, Counter>();
+
+ private TreeMap<String, CounterGroup> groups =
+ new TreeMap<String, CounterGroup>();
+
+ public Counters() {
+ }
+
+ Counters(org.apache.hadoop.mapred.Counters counters) {
+ for(org.apache.hadoop.mapred.Counters.Group group: counters) {
+ String name = group.getName();
+ CounterGroup newGroup = new CounterGroup(name, group.getDisplayName());
+ groups.put(name, newGroup);
+ for(Counter counter: group) {
+ newGroup.addCounter(counter);
+ }
+ }
+ }
+
+ public Counter findCounter(String groupName, String counterName) {
+ CounterGroup grp = groups.get(groupName);
+ if (grp == null) {
+ grp = new CounterGroup(groupName);
+ groups.put(groupName, grp);
+ }
+ return grp.findCounter(counterName);
+ }
+
+ /**
+ * Find the counter for the given enum. The same enum will always return the
+ * same counter.
+ * @param key the counter key
+ * @return the matching counter object
+ */
+ public synchronized Counter findCounter(Enum<?> key) {
+ Counter counter = cache.get(key);
+ if (counter == null) {
+ counter = findCounter(key.getDeclaringClass().getName(), key.toString());
+ cache.put(key, counter);
+ }
+ return counter;
+ }
+
+ /**
+ * Returns the names of all counter classes.
+ * @return Set of counter names.
+ */
+ public synchronized Collection<String> getGroupNames() {
+ return groups.keySet();
+ }
+
+ @Override
+ public Iterator<CounterGroup> iterator() {
+ return groups.values().iterator();
+ }
+
+ /**
+ * Returns the named counter group, or an empty group if there is none
+ * with the specified name.
+ */
+ public synchronized CounterGroup getGroup(String groupName) {
+ return groups.get(groupName);
+ }
+
+ /**
+ * Returns the total number of counters, by summing the number of counters
+ * in each group.
+ */
+ public synchronized int countCounters() {
+ int result = 0;
+ for (CounterGroup group : this) {
+ result += group.size();
+ }
+ return result;
+ }
+
+ /**
+ * Write the set of groups.
+ * The external format is:
+ * #groups (groupName group)*
+ *
+ * i.e. the number of groups followed by 0 or more groups, where each
+ * group is of the form:
+ *
+ * groupDisplayName #counters (false | true counter)*
+ *
+ * where each counter is of the form:
+ *
+ * name (false | true displayName) value
+ */
+ @Override
+ public synchronized void write(DataOutput out) throws IOException {
+ out.writeInt(groups.size());
+ for (org.apache.hadoop.mapreduce.CounterGroup group: groups.values()) {
+ Text.writeString(out, group.getName());
+ group.write(out);
+ }
+ }
+
+ /**
+ * Read a set of groups.
+ */
+ @Override
+ public synchronized void readFields(DataInput in) throws IOException {
+ int numClasses = in.readInt();
+ groups.clear();
+ while (numClasses-- > 0) {
+ String groupName = Text.readString(in);
+ CounterGroup group = new CounterGroup(groupName);
+ group.readFields(in);
+ groups.put(groupName, group);
+ }
+ }
+
+ /**
+ * Return textual representation of the counter values.
+ */
+ public synchronized String toString() {
+ StringBuilder sb = new StringBuilder("Counters: " + countCounters());
+ for (CounterGroup group: this) {
+ sb.append("\n\t" + group.getDisplayName());
+ for (Counter counter: group) {
+ sb.append("\n\t\t" + counter.getDisplayName() + "=" +
+ counter.getValue());
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Increments multiple counters by their amounts in another Counters
+ * instance.
+ * @param other the other Counters instance
+ */
+ public synchronized void incrAllCounters(Counters other) {
+ for(Map.Entry<String, CounterGroup> rightEntry: other.groups.entrySet()) {
+ CounterGroup left = groups.get(rightEntry.getKey());
+ CounterGroup right = rightEntry.getValue();
+ if (left == null) {
+ left = new CounterGroup(right.getName(), right.getDisplayName());
+ groups.put(rightEntry.getKey(), left);
+ }
+ left.incrAllCounters(right);
+ }
+ }
+
+ public boolean equals(Object genericRight) {
+ if (genericRight instanceof Counters) {
+ Iterator<CounterGroup> right = ((Counters) genericRight).groups.
+ values().iterator();
+ Iterator<CounterGroup> left = groups.values().iterator();
+ while (left.hasNext()) {
+ if (!right.hasNext() || !left.next().equals(right.next())) {
+ return false;
+ }
+ }
+ return !right.hasNext();
+ }
+ return false;
+ }
+
+ public int hashCode() {
+ return groups.hashCode();
+ }
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ID.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ID.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ID.java Mon Dec 15 14:21:32 2008
@@ -34,6 +34,7 @@
* @see TaskAttemptID
*/
public abstract class ID implements WritableComparable<ID> {
+ protected static final char SEPARATOR = '_';
protected int id;
/** constructs an ID object from the given int */
@@ -85,4 +86,5 @@
public void write(DataOutput out) throws IOException {
out.writeInt(id);
}
+
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Job.java Mon Dec 15 14:21:32 2008
@@ -21,10 +21,12 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
/**
@@ -34,27 +36,40 @@
* IllegalStateException.
*/
public class Job extends JobContext {
-
- public Job() {
+ public static enum JobState {DEFINE, RUNNING};
+ private JobState state = JobState.DEFINE;
+ private JobClient jobTracker;
+ private RunningJob info;
+
+ public Job() throws IOException {
this(new Configuration());
}
- public Job(Configuration conf) {
+ public Job(Configuration conf) throws IOException {
super(conf, null);
+ jobTracker = new JobClient((JobConf) getConfiguration());
}
- public Job(Configuration conf, String jobName) {
+ public Job(Configuration conf, String jobName) throws IOException {
this(conf);
setJobName(jobName);
}
+ private void ensureState(JobState state) throws IllegalStateException {
+ if (state != this.state) {
+ throw new IllegalStateException("Job in state "+ this.state +
+ " instead of " + state);
+ }
+ }
+
/**
* Set the number of reduce tasks for the job.
* @param tasks the number of reduce tasks
* @throws IllegalStateException if the job is submitted
*/
public void setNumReduceTasks(int tasks) throws IllegalStateException {
- conf.setInt(NUM_REDUCES_ATTR, tasks);
+ ensureState(JobState.DEFINE);
+ conf.setNumReduceTasks(tasks);
}
/**
@@ -64,8 +79,8 @@
* @throws IllegalStateException if the job is submitted
*/
public void setWorkingDirectory(Path dir) throws IOException {
- dir = dir.makeQualified(FileSystem.get(conf));
- conf.set(WORKING_DIR_ATTR, dir.toString());
+ ensureState(JobState.DEFINE);
+ conf.setWorkingDirectory(dir);
}
/**
@@ -75,6 +90,7 @@
*/
public void setInputFormatClass(Class<? extends InputFormat<?,?>> cls
) throws IllegalStateException {
+ ensureState(JobState.DEFINE);
conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, InputFormat.class);
}
@@ -85,6 +101,7 @@
*/
public void setOutputFormatClass(Class<? extends OutputFormat<?,?>> cls
) throws IllegalStateException {
+ ensureState(JobState.DEFINE);
conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class);
}
@@ -95,16 +112,34 @@
*/
public void setMapperClass(Class<? extends Mapper<?,?,?,?>> cls
) throws IllegalStateException {
+ ensureState(JobState.DEFINE);
conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
}
/**
+ * Set the Jar by finding where a given class came from.
+ * @param cls the example class
+ */
+ public void setJarByClass(Class<?> cls) {
+ conf.setJarByClass(cls);
+ }
+
+ /**
+ * Get the pathname of the job's jar.
+ * @return the pathname
+ */
+ public String getJar() {
+ return conf.getJar();
+ }
+
+ /**
* Set the combiner class for the job.
* @param cls the combiner to use
* @throws IllegalStateException if the job is submitted
*/
public void setCombinerClass(Class<? extends Reducer<?,?,?,?>> cls
) throws IllegalStateException {
+ ensureState(JobState.DEFINE);
conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
}
@@ -115,6 +150,7 @@
*/
public void setReducerClass(Class<? extends Reducer<?,?,?,?>> cls
) throws IllegalStateException {
+ ensureState(JobState.DEFINE);
conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
}
@@ -125,6 +161,7 @@
*/
public void setPartitionerClass(Class<? extends Partitioner<?,?>> cls
) throws IllegalStateException {
+ ensureState(JobState.DEFINE);
conf.setClass(PARTITIONER_CLASS_ATTR, cls, Partitioner.class);
}
@@ -138,7 +175,8 @@
*/
public void setMapOutputKeyClass(Class<?> theClass
) throws IllegalStateException {
- conf.setClass(MAP_OUTPUT_KEY_CLASS_ATTR, theClass, Object.class);
+ ensureState(JobState.DEFINE);
+ conf.setMapOutputKeyClass(theClass);
}
/**
@@ -151,7 +189,8 @@
*/
public void setMapOutputValueClass(Class<?> theClass
) throws IllegalStateException {
- conf.setClass(MAP_OUTPUT_VALUE_CLASS_ATTR, theClass, Object.class);
+ ensureState(JobState.DEFINE);
+ conf.setMapOutputValueClass(theClass);
}
/**
@@ -162,7 +201,8 @@
*/
public void setOutputKeyClass(Class<?> theClass
) throws IllegalStateException {
- conf.setClass(OUTPUT_KEY_CLASS_ATTR, theClass, Object.class);
+ ensureState(JobState.DEFINE);
+ conf.setOutputKeyClass(theClass);
}
/**
@@ -173,7 +213,8 @@
*/
public void setOutputValueClass(Class<?> theClass
) throws IllegalStateException {
- conf.setClass(OUTPUT_VALUE_CLASS_ATTR, theClass, Object.class);
+ ensureState(JobState.DEFINE);
+ conf.setOutputValueClass(theClass);
}
/**
@@ -184,19 +225,22 @@
*/
public void setSortComparatorClass(Class<? extends RawComparator<?>> cls
) throws IllegalStateException {
- conf.setClass(SORT_COMPARATOR_ATTR, cls, RawComparator.class);
+ ensureState(JobState.DEFINE);
+ conf.setOutputKeyComparatorClass(cls);
}
/**
* Define the comparator that controls which keys are grouped together
* for a single call to
- * {@link Reducer#reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)}
+ * {@link Reducer#reduce(Object, Iterable,
+ * org.apache.hadoop.mapreduce.Reducer.Context)}
* @param cls the raw comparator to use
* @throws IllegalStateException if the job is submitted
*/
public void setGroupingComparatorClass(Class<? extends RawComparator<?>> cls
) throws IllegalStateException {
- conf.setClass(GROUPING_COMPARATOR_ATTR, cls, RawComparator.class);
+ ensureState(JobState.DEFINE);
+ conf.setOutputValueGroupingComparator(cls);
}
/**
@@ -206,7 +250,8 @@
* @throws IllegalStateException if the job is submitted
*/
public void setJobName(String name) throws IllegalStateException {
- conf.set(JOB_NAME_ATTR, name);
+ ensureState(JobState.DEFINE);
+ conf.setJobName(name);
}
/**
@@ -215,8 +260,8 @@
* @return the URL where some job progress information will be displayed.
*/
public String getTrackingURL() {
- // TODO
- return null;
+ ensureState(JobState.RUNNING);
+ return info.getTrackingURL();
}
/**
@@ -227,8 +272,8 @@
* @throws IOException
*/
public float mapProgress() throws IOException {
- // TODO
- return 0.0f;
+ ensureState(JobState.RUNNING);
+ return info.mapProgress();
}
/**
@@ -239,8 +284,8 @@
* @throws IOException
*/
public float reduceProgress() throws IOException {
- // TODO
- return 0.0f;
+ ensureState(JobState.RUNNING);
+ return info.reduceProgress();
}
/**
@@ -251,8 +296,8 @@
* @throws IOException
*/
public boolean isComplete() throws IOException {
- // TODO
- return false;
+ ensureState(JobState.RUNNING);
+ return info.isComplete();
}
/**
@@ -262,8 +307,8 @@
* @throws IOException
*/
public boolean isSuccessful() throws IOException {
- // TODO
- return false;
+ ensureState(JobState.RUNNING);
+ return info.isSuccessful();
}
/**
@@ -273,7 +318,8 @@
* @throws IOException
*/
public void killJob() throws IOException {
- // TODO
+ ensureState(JobState.RUNNING);
+ info.killJob();
}
/**
@@ -285,8 +331,8 @@
*/
public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom
) throws IOException {
- // TODO
- return null;
+ ensureState(JobState.RUNNING);
+ return info.getTaskCompletionEvents(startFrom);
}
/**
@@ -296,7 +342,9 @@
* @throws IOException
*/
public void killTask(TaskAttemptID taskId) throws IOException {
- // TODO
+ ensureState(JobState.RUNNING);
+ info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId),
+ false);
}
/**
@@ -306,7 +354,9 @@
* @throws IOException
*/
public void failTask(TaskAttemptID taskId) throws IOException {
- // TODO
+ ensureState(JobState.RUNNING);
+ info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId),
+ true);
}
/**
@@ -316,17 +366,77 @@
* @throws IOException
*/
public Iterable<CounterGroup> getCounters() throws IOException {
- // TODO
- return null;
+ ensureState(JobState.RUNNING);
+ return new Counters(info.getCounters());
+ }
+
+ private void ensureNotSet(String attr, String msg) throws IOException {
+ if (conf.get(attr) != null) {
+ throw new IOException(attr + " is incompatible with " + msg + " mode.");
+ }
+ }
+
+ /**
+ * Default to the new APIs unless they are explicitly set or the old mapper or
+ * reduce attributes are used.
+ * @throws IOException if the configuration is inconsistant
+ */
+ private void setUseNewAPI() throws IOException {
+ int numReduces = conf.getNumReduceTasks();
+ String oldMapperClass = "mapred.mapper.class";
+ String oldReduceClass = "mapred.reducer.class";
+ String oldCombineClass = "mapred.combiner.class";
+ conf.setBooleanIfUnset("mapred.mapper.new-api",
+ conf.get(oldMapperClass) == null);
+ if (conf.getUseNewMapper()) {
+ String mode = "new map API";
+ ensureNotSet("mapred.input.format.class", mode);
+ ensureNotSet(oldMapperClass, mode);
+ if (numReduces != 0) {
+ ensureNotSet(oldCombineClass, mode);
+ ensureNotSet("mapred.partitioner.class", mode);
+ } else {
+ ensureNotSet("mapred.output.format.class", mode);
+ }
+ } else {
+ String mode = "map compatability";
+ ensureNotSet(JobContext.INPUT_FORMAT_CLASS_ATTR, mode);
+ ensureNotSet(JobContext.MAP_CLASS_ATTR, mode);
+ if (numReduces != 0) {
+ ensureNotSet(JobContext.COMBINE_CLASS_ATTR, mode);
+ ensureNotSet(JobContext.PARTITIONER_CLASS_ATTR, mode);
+ } else {
+ ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
+ }
+ }
+ if (numReduces != 0) {
+ conf.setBooleanIfUnset("mapred.reducer.new-api",
+ conf.get(oldReduceClass) == null);
+ if (conf.getUseNewReducer()) {
+ String mode = "new reduce API";
+ ensureNotSet("mapred.output.format.class", mode);
+ ensureNotSet(oldReduceClass, mode);
+ ensureNotSet(oldCombineClass, mode);
+ } else {
+ String mode = "reduce compatability";
+ ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
+ ensureNotSet(JobContext.REDUCE_CLASS_ATTR, mode);
+ ensureNotSet(JobContext.COMBINE_CLASS_ATTR, mode);
+ }
+ }
}
/**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
- public void submit() throws IOException {
- // TODO
- }
+ public void submit() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ ensureState(JobState.DEFINE);
+ setUseNewAPI();
+ info = jobTracker.submitJobInternal(conf);
+ state = JobState.RUNNING;
+ }
/**
* Submit the job to the cluster and wait for it to finish.
@@ -334,8 +444,12 @@
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost
*/
- public boolean waitForCompletion() throws IOException {
- // TODO
- return false;
+ public boolean waitForCompletion() throws IOException, InterruptedException,
+ ClassNotFoundException {
+ if (state == JobState.DEFINE) {
+ submit();
+ }
+ info.waitForCompletion();
+ return isSuccessful();
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobContext.java Mon Dec 15 14:21:32 2008
@@ -21,15 +21,12 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
/**
* A read-only view of the job that is provided to the tasks while they
@@ -38,35 +35,21 @@
public class JobContext {
// Put all of the attribute names in here so that Job and JobContext are
// consistent.
- protected static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.map.class";
+ protected static final String INPUT_FORMAT_CLASS_ATTR =
+ "mapreduce.inputformat.class";
protected static final String MAP_CLASS_ATTR = "mapreduce.map.class";
protected static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
protected static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
protected static final String OUTPUT_FORMAT_CLASS_ATTR =
"mapreduce.outputformat.class";
- protected static final String OUTPUT_KEY_CLASS_ATTR =
- "mapreduce.out.key.class";
- protected static final String OUTPUT_VALUE_CLASS_ATTR =
- "mapreduce.out.value.class";
- protected static final String MAP_OUTPUT_KEY_CLASS_ATTR =
- "mapreduce.map.out.key.class";
- protected static final String MAP_OUTPUT_VALUE_CLASS_ATTR =
- "mapreduce.map.out.value.class";
- protected static final String NUM_REDUCES_ATTR = "mapreduce.reduce.tasks";
- protected static final String WORKING_DIR_ATTR = "mapreduce.work.dir";
- protected static final String JOB_NAME_ATTR = "mapreduce.job.name";
- protected static final String SORT_COMPARATOR_ATTR =
- "mapreduce.sort.comparator";
- protected static final String GROUPING_COMPARATOR_ATTR =
- "mapreduce.grouping.comparator";
protected static final String PARTITIONER_CLASS_ATTR =
"mapreduce.partitioner.class";
- protected final Configuration conf;
+ protected final org.apache.hadoop.mapred.JobConf conf;
private final JobID jobId;
public JobContext(Configuration conf, JobID jobId) {
- this.conf = conf;
+ this.conf = new org.apache.hadoop.mapred.JobConf(conf);
this.jobId = jobId;
}
@@ -92,7 +75,7 @@
* @return the number of reduce tasks for this job.
*/
public int getNumReduceTasks() {
- return conf.getInt(NUM_REDUCES_ATTR, 1);
+ return conf.getNumReduceTasks();
}
/**
@@ -101,14 +84,7 @@
* @return the directory name.
*/
public Path getWorkingDirectory() throws IOException {
- String name = conf.get(WORKING_DIR_ATTR);
- if (name != null) {
- return new Path(name);
- } else {
- Path dir = FileSystem.get(conf).getWorkingDirectory();
- conf.set(WORKING_DIR_ATTR, dir.toString());
- return dir;
- }
+ return conf.getWorkingDirectory();
}
/**
@@ -116,8 +92,7 @@
* @return the key class for the job output data.
*/
public Class<?> getOutputKeyClass() {
- return conf.getClass(OUTPUT_KEY_CLASS_ATTR,
- LongWritable.class, Object.class);
+ return conf.getOutputKeyClass();
}
/**
@@ -125,7 +100,7 @@
* @return the value class for job outputs.
*/
public Class<?> getOutputValueClass() {
- return conf.getClass(OUTPUT_VALUE_CLASS_ATTR, Text.class, Object.class);
+ return conf.getOutputValueClass();
}
/**
@@ -135,12 +110,7 @@
* @return the map output key class.
*/
public Class<?> getMapOutputKeyClass() {
- Class<?> retv = conf.getClass(MAP_OUTPUT_KEY_CLASS_ATTR, null,
- Object.class);
- if (retv == null) {
- retv = getOutputKeyClass();
- }
- return retv;
+ return conf.getMapOutputKeyClass();
}
/**
@@ -151,12 +121,7 @@
* @return the map output value class.
*/
public Class<?> getMapOutputValueClass() {
- Class<?> retv = conf.getClass(MAP_OUTPUT_VALUE_CLASS_ATTR, null,
- Object.class);
- if (retv == null) {
- retv = getOutputValueClass();
- }
- return retv;
+ return conf.getMapOutputValueClass();
}
/**
@@ -166,7 +131,7 @@
* @return the job's name, defaulting to "".
*/
public String getJobName() {
- return conf.get(JOB_NAME_ATTR, "");
+ return conf.getJobName();
}
/**
@@ -178,7 +143,7 @@
public Class<? extends InputFormat<?,?>> getInputFormatClass()
throws ClassNotFoundException {
return (Class<? extends InputFormat<?,?>>)
- conf.getClass(INPUT_FORMAT_CLASS_ATTR, InputFormat.class);
+ conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}
/**
@@ -202,7 +167,7 @@
public Class<? extends Reducer<?,?,?,?>> getCombinerClass()
throws ClassNotFoundException {
return (Class<? extends Reducer<?,?,?,?>>)
- conf.getClass(COMBINE_CLASS_ATTR, Reducer.class);
+ conf.getClass(COMBINE_CLASS_ATTR, null);
}
/**
@@ -226,7 +191,7 @@
public Class<? extends OutputFormat<?,?>> getOutputFormatClass()
throws ClassNotFoundException {
return (Class<? extends OutputFormat<?,?>>)
- conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, OutputFormat.class);
+ conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
}
/**
@@ -238,7 +203,7 @@
public Class<? extends Partitioner<?,?>> getPartitionerClass()
throws ClassNotFoundException {
return (Class<? extends Partitioner<?,?>>)
- conf.getClass(PARTITIONER_CLASS_ATTR, Partitioner.class);
+ conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
/**
@@ -246,14 +211,16 @@
*
* @return the {@link RawComparator} comparator used to compare keys.
*/
- @SuppressWarnings("unchecked")
public RawComparator<?> getSortComparator() {
- Class<?> theClass = conf.getClass(SORT_COMPARATOR_ATTR, null,
- RawComparator.class);
- if (theClass != null)
- return (RawComparator<?>) ReflectionUtils.newInstance(theClass, conf);
- return WritableComparator.get(
- (Class<? extends WritableComparable>)getMapOutputKeyClass());
+ return conf.getOutputKeyComparator();
+ }
+
+ /**
+ * Get the pathname of the job's jar.
+ * @return the pathname
+ */
+ public String getJar() {
+ return conf.getJar();
}
/**
@@ -264,12 +231,6 @@
* @see Job#setGroupingComparatorClass(Class) for details.
*/
public RawComparator<?> getGroupingComparator() {
- Class<?> theClass = conf.getClass(GROUPING_COMPARATOR_ATTR, null,
- RawComparator.class);
- if (theClass == null) {
- return getSortComparator();
- }
- return (RawComparator<?>) ReflectionUtils.newInstance(theClass, conf);
+ return conf.getOutputValueGroupingComparator();
}
-
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobID.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobID.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/JobID.java Mon Dec 15 14:21:32 2008
@@ -24,7 +24,6 @@
import java.text.NumberFormat;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobTracker;
/**
* JobID represents the immutable and unique identifier for
@@ -42,15 +41,15 @@
*
* @see TaskID
* @see TaskAttemptID
- * @see JobTracker#getNewJobId()
- * @see JobTracker#getStartTime()
+ * @see org.apache.hadoop.mapred.JobTracker#getNewJobId()
+ * @see org.apache.hadoop.mapred.JobTracker#getStartTime()
*/
-public class JobID extends ID {
- private static final String JOB = "job";
- private String jtIdentifier;
- private static char UNDERSCORE = '_';
+public class JobID extends org.apache.hadoop.mapred.ID
+ implements Comparable<ID> {
+ protected static final String JOB = "job";
+ private final Text jtIdentifier;
- private static NumberFormat idFormat = NumberFormat.getInstance();
+ protected static final NumberFormat idFormat = NumberFormat.getInstance();
static {
idFormat.setGroupingUsed(false);
idFormat.setMinimumIntegerDigits(4);
@@ -63,13 +62,15 @@
*/
public JobID(String jtIdentifier, int id) {
super(id);
- this.jtIdentifier = jtIdentifier;
+ this.jtIdentifier = new Text(jtIdentifier);
}
- private JobID() { }
+ public JobID() {
+ jtIdentifier = new Text();
+ }
public String getJtIdentifier() {
- return jtIdentifier;
+ return jtIdentifier.toString();
}
@Override
@@ -92,42 +93,40 @@
else return jtComp;
}
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- return builder.append(JOB).append(UNDERSCORE)
- .append(toStringWOPrefix()).toString();
- }
-
- /** Returns the string representation w/o prefix */
- StringBuilder toStringWOPrefix() {
- StringBuilder builder = new StringBuilder();
- builder.append(jtIdentifier).append(UNDERSCORE)
- .append(idFormat.format(id)).toString();
+ /**
+ * Add the stuff after the "job" prefix to the given builder. This is useful,
+ * because the sub-ids use this substring at the start of their string.
+ * @param builder the builder to append to
+ * @return the builder that was passed in
+ */
+ public StringBuilder appendTo(StringBuilder builder) {
+ builder.append(SEPARATOR);
+ builder.append(jtIdentifier);
+ builder.append(SEPARATOR);
+ builder.append(idFormat.format(id));
return builder;
}
-
+
@Override
public int hashCode() {
- return toStringWOPrefix().toString().hashCode();
+ return jtIdentifier.hashCode() + id;
}
-
+
+ @Override
+ public String toString() {
+ return appendTo(new StringBuilder(JOB)).toString();
+ }
+
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
- this.jtIdentifier = Text.readString(in);
+ this.jtIdentifier.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
- Text.writeString(out, jtIdentifier);
- }
-
- public static JobID read(DataInput in) throws IOException {
- JobID jobId = new JobID();
- jobId.readFields(in);
- return jobId;
+ jtIdentifier.write(out);
}
/** Construct a JobId object from given string
@@ -141,7 +140,8 @@
String[] parts = str.split("_");
if(parts.length == 3) {
if(parts[0].equals(JOB)) {
- return new JobID(parts[1], Integer.parseInt(parts[2]));
+ return new org.apache.hadoop.mapred.JobID(parts[1],
+ Integer.parseInt(parts[2]));
}
}
}catch (Exception ex) {//fall below
@@ -150,32 +150,4 @@
+ " is not properly formed");
}
- /**
- * Returns a regex pattern which matches task IDs. Arguments can
- * be given null, in which case that part of the regex will be generic.
- * For example to obtain a regex matching <i>any job</i>
- * run on the jobtracker started at <i>200707121733</i>, we would use :
- * <pre>
- * JobID.getTaskIDsPattern("200707121733", null);
- * </pre>
- * which will return :
- * <pre> "job_200707121733_[0-9]*" </pre>
- * @param jtIdentifier jobTracker identifier, or null
- * @param jobId job number, or null
- * @return a regex pattern matching JobIDs
- */
- public static String getJobIDsPattern(String jtIdentifier, Integer jobId) {
- StringBuilder builder = new StringBuilder(JOB).append(UNDERSCORE);
- builder.append(getJobIDsPatternWOPrefix(jtIdentifier, jobId));
- return builder.toString();
- }
-
- static StringBuilder getJobIDsPatternWOPrefix(String jtIdentifier
- , Integer jobId) {
- StringBuilder builder = new StringBuilder()
- .append(jtIdentifier != null ? jtIdentifier : "[^_]*").append(UNDERSCORE)
- .append(jobId != null ? idFormat.format(jobId) : "[0-9]*");
- return builder;
- }
-
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MapContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MapContext.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MapContext.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/MapContext.java Mon Dec 15 14:21:32 2008
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
/**
@@ -27,17 +29,43 @@
* @param <KEYOUT> the key output type from the Mapper
* @param <VALUEOUT> the value output type from the Mapper
*/
-public abstract class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+public class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
+ private RecordReader<KEYIN,VALUEIN> reader;
+ private InputSplit split;
- public MapContext(Configuration conf, TaskAttemptID taskid) {
- super(conf, taskid);
+ public MapContext(Configuration conf, TaskAttemptID taskid,
+ RecordReader<KEYIN,VALUEIN> reader,
+ RecordWriter<KEYOUT,VALUEOUT> writer,
+ OutputCommitter committer,
+ StatusReporter reporter,
+ InputSplit split) {
+ super(conf, taskid, writer, committer, reporter);
+ this.reader = reader;
+ this.split = split;
}
/**
* Get the input split for this map.
*/
- public abstract InputSplit getInputSplit();
+ public InputSplit getInputSplit() {
+ return split;
+ }
+
+ @Override
+ public KEYIN getCurrentKey() throws IOException, InterruptedException {
+ return reader.getCurrentKey();
+ }
+
+ @Override
+ public VALUEIN getCurrentValue() throws IOException, InterruptedException {
+ return reader.getCurrentValue();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ return reader.nextKeyValue();
+ }
}
\ No newline at end of file
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Mapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Mapper.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Mapper.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Mapper.java Mon Dec 15 14:21:32 2008
@@ -94,10 +94,15 @@
*/
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
- public abstract class Context
+ public class Context
extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
- public Context(Configuration conf, TaskAttemptID taskid) {
- super(conf, taskid);
+ public Context(Configuration conf, TaskAttemptID taskid,
+ RecordReader<KEYIN,VALUEIN> reader,
+ RecordWriter<KEYOUT,VALUEOUT> writer,
+ OutputCommitter committer,
+ StatusReporter reporter,
+ InputSplit split) throws IOException, InterruptedException {
+ super(conf, taskid, reader, writer, committer, reporter, split);
}
}
@@ -116,7 +121,7 @@
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
- context.collect((KEYOUT) key, (VALUEOUT) value);
+ context.write((KEYOUT) key, (VALUEOUT) value);
}
/**
@@ -135,12 +140,8 @@
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
- KEYIN key = context.nextKey(null);
- VALUEIN value = null;
- while (key != null) {
- value = context.nextValue(value);
- map(key, value, context);
- key = context.nextKey(key);
+ while (context.nextKeyValue()) {
+ map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputCommitter.java?rev=726850&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputCommitter.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputCommitter.java Mon Dec 15 14:21:32 2008
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+import java.io.IOException;
+
+/**
+ * <code>OutputCommitter</code> describes the commit of task output for a
+ * Map-Reduce job.
+ *
+ * <p>The Map-Reduce framework relies on the <code>OutputCommitter</code> of
+ * the job to:<p>
+ * <ol>
+ * <li>
+ * Setup the job during initialization. For example, create the temporary
+ * output directory for the job during the initialization of the job.
+ * </li>
+ * <li>
+ * Cleanup the job after the job completion. For example, remove the
+ * temporary output directory after the job completion.
+ * </li>
+ * <li>
+ * Setup the task temporary output.
+ * </li>
+ * <li>
+ * Check whether a task needs a commit. This is to avoid the commit
+ * procedure if a task does not need commit.
+ * </li>
+ * <li>
+ * Commit of the task output.
+ * </li>
+ * <li>
+ * Discard the task commit.
+ * </li>
+ * </ol>
+ *
+ * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+ * @see JobContext
+ * @see TaskAttemptContext
+ *
+ */
+public abstract class OutputCommitter {
+ /**
+ * For the framework to setup the job output during initialization
+ *
+ * @param jobContext Context of the job whose output is being written.
+ * @throws IOException if temporary output could not be created
+ */
+ public abstract void setupJob(JobContext jobContext) throws IOException;
+
+ /**
+ * For cleaning up the job's output after job completion
+ *
+ * @param jobContext Context of the job whose output is being written.
+ * @throws IOException
+ */
+ public abstract void cleanupJob(JobContext jobContext) throws IOException;
+
+ /**
+ * Sets up output for the task.
+ *
+ * @param taskContext Context of the task whose output is being written.
+ * @throws IOException
+ */
+ public abstract void setupTask(TaskAttemptContext taskContext)
+ throws IOException;
+
+ /**
+ * Check whether task needs a commit
+ *
+ * @param taskContext
+ * @return true/false
+ * @throws IOException
+ */
+ public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
+ throws IOException;
+
+ /**
+ * To promote the task's temporary output to final output location
+ *
+ * The task's output is moved to the job's output directory.
+ *
+ * @param taskContext Context of the task whose output is being written.
+ * @throws IOException if commit is not
+ */
+ public abstract void commitTask(TaskAttemptContext taskContext)
+ throws IOException;
+
+ /**
+ * Discard the task output
+ *
+ * @param taskContext
+ * @throws IOException
+ */
+ public abstract void abortTask(TaskAttemptContext taskContext)
+ throws IOException;
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/OutputFormat.java Mon Dec 15 14:21:32 2008
@@ -68,5 +68,17 @@
public abstract void checkOutputSpecs(JobContext context
) throws IOException,
InterruptedException;
+
+ /**
+ * Get the output committer for this output format. This is responsible
+ * for ensuring the output is committed correctly.
+ * @param context the task context
+ * @return an output committer
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract
+ OutputCommitter getOutputCommitter(TaskAttemptContext context
+ ) throws IOException, InterruptedException;
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Partitioner.java Mon Dec 15 14:21:32 2008
@@ -44,4 +44,5 @@
* @return the partition number for the <code>key</code>.
*/
public abstract int getPartition(KEY key, VALUE value, int numPartitions);
+
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordReader.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordReader.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordReader.java Mon Dec 15 14:21:32 2008
@@ -41,24 +41,31 @@
) throws IOException, InterruptedException;
/**
- * Read the next key.
- * @param key the object to be read into, which may be null
- * @return the object that was read
+ * Read the next key, value pair.
+ * @return true if a key/value pair was read
* @throws IOException
* @throws InterruptedException
*/
- public abstract KEYIN nextKey(KEYIN key
- ) throws IOException, InterruptedException;
+ public abstract
+ boolean nextKeyValue() throws IOException, InterruptedException;
/**
- * Read the next value. It must be called after {@link #nextKey(Object)}.
- * @param value the object to read into, which may be null
+ * Get the current key
+ * @return the current key or null if there is no current key
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract
+ KEYIN getCurrentKey() throws IOException, InterruptedException;
+
+ /**
+ * Get the current value.
* @return the object that was read
* @throws IOException
* @throws InterruptedException
*/
- public abstract VALUEIN nextValue(VALUEIN value
- ) throws IOException, InterruptedException;
+ public abstract
+ VALUEIN getCurrentValue() throws IOException, InterruptedException;
/**
* The current progress of the record reader through its data.
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordWriter.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordWriter.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/RecordWriter.java Mon Dec 15 14:21:32 2008
@@ -31,7 +31,7 @@
*
* @see OutputFormat
*/
-public interface RecordWriter<K, V> {
+public abstract class RecordWriter<K, V> {
/**
* Writes a key/value pair.
*
@@ -39,7 +39,8 @@
* @param value the value to write.
* @throws IOException
*/
- void write(K key, V value) throws IOException, InterruptedException;
+ public abstract void write(K key, V value
+ ) throws IOException, InterruptedException;
/**
* Close this <code>RecordWriter</code> to future operations.
@@ -47,6 +48,6 @@
* @param context the context of the task
* @throws IOException
*/
- void close(TaskAttemptContext context
- ) throws IOException, InterruptedException;
+ public abstract void close(TaskAttemptContext context
+ ) throws IOException, InterruptedException;
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/ReduceContext.java Mon Dec 15 14:21:32 2008
@@ -19,8 +19,17 @@
package org.apache.hadoop.mapreduce;
import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.util.Progressable;
/**
* The context passed to the {@link Reducer}.
@@ -29,19 +38,151 @@
* @param <KEYOUT> the class of the output keys
* @param <VALUEOUT> the class of the output values
*/
-public abstract class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
+public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
+ private RawKeyValueIterator input;
+ private RawComparator<KEYIN> comparator;
+ private KEYIN key; // current key
+ private VALUEIN value; // current value
+ private boolean firstValue = false; // first value in key
+ private boolean nextKeyIsSame = false; // more w/ this key
+ private boolean hasMore; // more in file
+ protected Progressable reporter;
+ private Deserializer<KEYIN> keyDeserializer;
+ private Deserializer<VALUEIN> valueDeserializer;
+ private DataInputBuffer buffer = new DataInputBuffer();
+ private BytesWritable currentRawKey = new BytesWritable();
+ private ValueIterable iterable = new ValueIterable();
+
+ public ReduceContext(Configuration conf, TaskAttemptID taskid,
+ RawKeyValueIterator input,
+ RecordWriter<KEYOUT,VALUEOUT> output,
+ OutputCommitter committer,
+ StatusReporter reporter,
+ RawComparator<KEYIN> comparator,
+ Class<KEYIN> keyClass,
+ Class<VALUEIN> valueClass
+ ) throws InterruptedException, IOException{
+ super(conf, taskid, output, committer, reporter);
+ this.input = input;
+ this.comparator = comparator;
+ SerializationFactory serializationFactory = new SerializationFactory(conf);
+ this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+ this.keyDeserializer.open(buffer);
+ this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
+ this.valueDeserializer.open(buffer);
+ hasMore = input.next();
+ }
- public ReduceContext(Configuration conf, TaskAttemptID taskid) {
- super(conf, taskid);
+ /** Start processing next unique key. */
+ public boolean nextKey() throws IOException,InterruptedException {
+ while (hasMore && nextKeyIsSame) {
+ nextKeyValue();
+ }
+ if (hasMore) {
+ return nextKeyValue();
+ } else {
+ return false;
+ }
}
/**
+ * Advance to the next key/value pair.
+ */
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (!hasMore) {
+ key = null;
+ value = null;
+ return false;
+ }
+ firstValue = !nextKeyIsSame;
+ DataInputBuffer next = input.getKey();
+ currentRawKey.set(next.getData(), next.getPosition(),
+ next.getLength() - next.getPosition());
+ buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
+ key = keyDeserializer.deserialize(key);
+ next = input.getValue();
+ buffer.reset(next.getData(), next.getPosition(), next.getLength());
+ value = valueDeserializer.deserialize(value);
+ hasMore = input.next();
+ if (hasMore) {
+ next = input.getKey();
+ nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
+ currentRawKey.getLength(),
+ next.getData(),
+ next.getPosition(),
+ next.getLength() - next.getPosition()
+ ) == 0;
+ } else {
+ nextKeyIsSame = false;
+ }
+ return true;
+ }
+
+ public KEYIN getCurrentKey() {
+ return key;
+ }
+
+ @Override
+ public VALUEIN getCurrentValue() {
+ return value;
+ }
+
+ protected class ValueIterator implements Iterator<VALUEIN> {
+
+ @Override
+ public boolean hasNext() {
+ return firstValue || nextKeyIsSame;
+ }
+
+ @Override
+ public VALUEIN next() {
+ // if this is the first record, we don't need to advance
+ if (firstValue) {
+ firstValue = false;
+ return value;
+ }
+ // if this isn't the first record and the next key is different, they
+ // can't advance it here.
+ if (!nextKeyIsSame) {
+ throw new NoSuchElementException("iterate past last value");
+ }
+ // otherwise, go to the next key/value pair
+ try {
+ nextKeyValue();
+ return value;
+ } catch (IOException ie) {
+ throw new RuntimeException("next value iterator failed", ie);
+ } catch (InterruptedException ie) {
+ // this is bad, but we can't modify the exception list of java.util
+ throw new RuntimeException("next value iterator interrupted", ie);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove not implemented");
+ }
+
+ }
+
+ protected class ValueIterable implements Iterable<VALUEIN> {
+ private ValueIterator iterator = new ValueIterator();
+ @Override
+ public Iterator<VALUEIN> iterator() {
+ return iterator;
+ }
+ }
+
+ /**
* Iterate through the values for the current key, reusing the same value
* object, which is stored in the context.
- * @return the series of values associated with the current key
+ * @return the series of values associated with the current key. All of the
+ * objects returned directly and indirectly from this method are reused.
*/
- public abstract
- Iterable<VALUEIN> getValues() throws IOException, InterruptedException;
-
+ public
+ Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
+ return iterable;
+ }
}
\ No newline at end of file
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/Reducer.java Mon Dec 15 14:21:32 2008
@@ -21,8 +21,8 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
/**
* Reduces a set of intermediate values which share a key to a smaller set of
@@ -88,7 +88,7 @@
* the sorted inputs.</p>
* <p>The output of the reduce task is typically written to a
* {@link RecordWriter} via
- * {@link Context#collect(Object, Object)}.</p>
+ * {@link Context#write(Object, Object)}.</p>
* </li>
* </ol>
*
@@ -117,10 +117,19 @@
*/
public abstract class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
- protected abstract class Context
+ public class Context
extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
- public Context(Configuration conf, TaskAttemptID taskid) {
- super(conf, taskid);
+ public Context(Configuration conf, TaskAttemptID taskid,
+ RawKeyValueIterator input,
+ RecordWriter<KEYOUT,VALUEOUT> output,
+ OutputCommitter committer,
+ StatusReporter reporter,
+ RawComparator<KEYIN> comparator,
+ Class<KEYIN> keyClass,
+ Class<VALUEIN> valueClass
+ ) throws IOException, InterruptedException {
+ super(conf, taskid, input, output, committer, reporter, comparator,
+ keyClass, valueClass);
}
}
@@ -141,7 +150,7 @@
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
- context.collect((KEYOUT) key, (VALUEOUT) value);
+ context.write((KEYOUT) key, (VALUEOUT) value);
}
}
@@ -160,10 +169,8 @@
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
- KEYIN key = context.nextKey(null);
- while(key != null) {
- reduce(key, context.getValues(), context);
- key = context.nextKey(key);
+ while (context.nextKey()) {
+ reduce(context.getCurrentKey(), context.getValues(), context);
}
cleanup(context);
}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/StatusReporter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/StatusReporter.java?rev=726850&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/StatusReporter.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/StatusReporter.java Mon Dec 15 14:21:32 2008
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+public abstract class StatusReporter {
+ public abstract Counter getCounter(Enum<?> name);
+ public abstract Counter getCounter(String group, String name);
+ public abstract void progress();
+ public abstract void setStatus(String status);
+}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptContext.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptContext.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptContext.java Mon Dec 15 14:21:32 2008
@@ -26,12 +26,12 @@
/**
* The context for task attempts.
*/
-public abstract class TaskAttemptContext extends JobContext
- implements Progressable {
+public class TaskAttemptContext extends JobContext implements Progressable {
private final TaskAttemptID taskId;
private String status = "";
- public TaskAttemptContext(Configuration conf, TaskAttemptID taskId) {
+ public TaskAttemptContext(Configuration conf,
+ TaskAttemptID taskId) {
super(conf, taskId.getJobID());
this.taskId = taskId;
}
@@ -39,7 +39,7 @@
/**
* Get the unique name for this task attempt.
*/
- public TaskAttemptID getTaskAttemptId() {
+ public TaskAttemptID getTaskAttemptID() {
return taskId;
}
@@ -59,13 +59,8 @@
}
/**
- * Lookup a counter by an enum.
+ * Report progress. The subtypes actually do work in this method.
*/
- public abstract Counter getCounter(Enum<?> counterName);
-
- /**
- * Lookup a counter by group and counter name. The enum-based interface is
- * preferred.
- */
- public abstract Counter getCounter(String groupName, String counterName);
+ public void progress() {
+ }
}
\ No newline at end of file
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptID.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptID.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskAttemptID.java Mon Dec 15 14:21:32 2008
@@ -33,7 +33,7 @@
* An example TaskAttemptID is :
* <code>attempt_200707121733_0003_m_000005_0</code> , which represents the
* zeroth task attempt for the fifth map task in the third job
- * running at the jobtracker started at <code>200707121733</code>.
+ * running at the jobtracker started at <code>200707121733</code>.
* <p>
* Applications should never construct or parse TaskAttemptID strings
* , but rather use appropriate constructors or {@link #forName(String)}
@@ -42,10 +42,9 @@
* @see JobID
* @see TaskID
*/
-public class TaskAttemptID extends ID {
- private static final String ATTEMPT = "attempt";
+public class TaskAttemptID extends org.apache.hadoop.mapred.ID {
+ protected static final String ATTEMPT = "attempt";
private TaskID taskId;
- private static final char UNDERSCORE = '_';
/**
* Constructs a TaskAttemptID object from given {@link TaskID}.
@@ -68,12 +67,14 @@
* @param taskId taskId number
* @param id the task attempt number
*/
- public TaskAttemptID(String jtIdentifier, int jobId, boolean isMap
- , int taskId, int id) {
+ public TaskAttemptID(String jtIdentifier, int jobId, boolean isMap,
+ int taskId, int id) {
this(new TaskID(jtIdentifier, jobId, isMap, taskId), id);
}
- private TaskAttemptID() { }
+ public TaskAttemptID() {
+ taskId = new TaskID();
+ }
/** Returns the {@link JobID} object that this task attempt belongs to */
public JobID getJobID() {
@@ -99,38 +100,19 @@
return this.taskId.equals(that.taskId);
}
- /**Compare TaskIds by first tipIds, then by task numbers. */
- @Override
- public int compareTo(ID o) {
- TaskAttemptID that = (TaskAttemptID)o;
- int tipComp = this.taskId.compareTo(that.taskId);
- if(tipComp == 0) {
- return this.id - that.id;
- }
- else return tipComp;
- }
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- return builder.append(ATTEMPT).append(UNDERSCORE)
- .append(toStringWOPrefix()).toString();
- }
-
- StringBuilder toStringWOPrefix() {
- StringBuilder builder = new StringBuilder();
- return builder.append(taskId.toStringWOPrefix())
- .append(UNDERSCORE).append(id);
- }
-
- @Override
- public int hashCode() {
- return toStringWOPrefix().toString().hashCode();
+ /**
+ * Add the unique string to the StringBuilder
+ * @param builder the builder to append ot
+ * @return the builder that was passed in.
+ */
+ protected StringBuilder appendTo(StringBuilder builder) {
+ return taskId.appendTo(builder).append(SEPARATOR).append(id);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
- this.taskId = TaskID.read(in);
+ taskId.readFields(in);
}
@Override
@@ -138,72 +120,55 @@
super.write(out);
taskId.write(out);
}
-
- public static TaskAttemptID read(DataInput in) throws IOException {
- TaskAttemptID taskId = new TaskAttemptID();
- taskId.readFields(in);
- return taskId;
+
+ @Override
+ public int hashCode() {
+ return taskId.hashCode() * 5 + id;
}
+ /**Compare TaskIds by first tipIds, then by task numbers. */
+ @Override
+ public int compareTo(ID o) {
+ TaskAttemptID that = (TaskAttemptID)o;
+ int tipComp = this.taskId.compareTo(that.taskId);
+ if(tipComp == 0) {
+ return this.id - that.id;
+ }
+ else return tipComp;
+ }
+ @Override
+ public String toString() {
+ return appendTo(new StringBuilder(ATTEMPT)).toString();
+ }
+
/** Construct a TaskAttemptID object from given string
* @return constructed TaskAttemptID object or null if the given String is null
* @throws IllegalArgumentException if the given string is malformed
*/
- public static TaskAttemptID forName(String str) throws IllegalArgumentException {
+ public static TaskAttemptID forName(String str
+ ) throws IllegalArgumentException {
if(str == null)
return null;
try {
- String[] parts = str.split("_");
+ String[] parts = str.split(Character.toString(SEPARATOR));
if(parts.length == 6) {
if(parts[0].equals(ATTEMPT)) {
boolean isMap = false;
if(parts[3].equals("m")) isMap = true;
else if(parts[3].equals("r")) isMap = false;
else throw new Exception();
- return new TaskAttemptID(parts[1], Integer.parseInt(parts[2]),
- isMap, Integer.parseInt(parts[4]), Integer.parseInt(parts[5]));
+ return new org.apache.hadoop.mapred.TaskAttemptID
+ (parts[1],
+ Integer.parseInt(parts[2]),
+ isMap, Integer.parseInt(parts[4]),
+ Integer.parseInt(parts[5]));
}
}
- }catch (Exception ex) {//fall below
+ } catch (Exception ex) {
+ //fall below
}
throw new IllegalArgumentException("TaskAttemptId string : " + str
+ " is not properly formed");
}
-
- /**
- * Returns a regex pattern which matches task attempt IDs. Arguments can
- * be given null, in which case that part of the regex will be generic.
- * For example to obtain a regex matching <i>all task attempt IDs</i>
- * of <i>any jobtracker</i>, in <i>any job</i>, of the <i>first
- * map task</i>, we would use :
- * <pre>
- * TaskAttemptID.getTaskAttemptIDsPattern(null, null, true, 1, null);
- * </pre>
- * which will return :
- * <pre> "attempt_[^_]*_[0-9]*_m_000001_[0-9]*" </pre>
- * @param jtIdentifier jobTracker identifier, or null
- * @param jobId job number, or null
- * @param isMap whether the tip is a map, or null
- * @param taskId taskId number, or null
- * @param attemptId the task attempt number, or null
- * @return a regex pattern matching TaskAttemptIDs
- */
- public static String getTaskAttemptIDsPattern(String jtIdentifier,
- Integer jobId, Boolean isMap, Integer taskId, Integer attemptId) {
- StringBuilder builder = new StringBuilder(ATTEMPT).append(UNDERSCORE);
- builder.append(getTaskAttemptIDsPatternWOPrefix(jtIdentifier, jobId,
- isMap, taskId, attemptId));
- return builder.toString();
- }
-
- static StringBuilder getTaskAttemptIDsPatternWOPrefix(String jtIdentifier
- , Integer jobId, Boolean isMap, Integer taskId, Integer attemptId) {
- StringBuilder builder = new StringBuilder();
- builder.append(TaskID.getTaskIDsPatternWOPrefix(jtIdentifier
- , jobId, isMap, taskId))
- .append(UNDERSCORE)
- .append(attemptId != null ? attemptId : "[0-9]*");
- return builder;
- }
-
+
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskID.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskID.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskID.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskID.java Mon Dec 15 14:21:32 2008
@@ -45,10 +45,9 @@
* @see JobID
* @see TaskAttemptID
*/
-public class TaskID extends ID {
- private static final String TASK = "task";
- private static char UNDERSCORE = '_';
- private static NumberFormat idFormat = NumberFormat.getInstance();
+public class TaskID extends org.apache.hadoop.mapred.ID {
+ protected static final String TASK = "task";
+ protected static final NumberFormat idFormat = NumberFormat.getInstance();
static {
idFormat.setGroupingUsed(false);
idFormat.setMinimumIntegerDigits(6);
@@ -83,7 +82,9 @@
this(new JobID(jtIdentifier, jobId), isMap, id);
}
- private TaskID() { }
+ public TaskID() {
+ jobId = new JobID();
+ }
/** Returns the {@link JobID} object that this tip belongs to */
public JobID getJobID() {
@@ -118,31 +119,34 @@
}
else return jobComp;
}
-
@Override
public String toString() {
- StringBuilder builder = new StringBuilder();
- return builder.append(TASK).append(UNDERSCORE)
- .append(toStringWOPrefix()).toString();
+ return appendTo(new StringBuilder(TASK)).toString();
}
- StringBuilder toStringWOPrefix() {
- StringBuilder builder = new StringBuilder();
- builder.append(jobId.toStringWOPrefix())
- .append(isMap ? "_m_" : "_r_");
- return builder.append(idFormat.format(id));
+ /**
+ * Add the unique string to the given builder.
+ * @param builder the builder to append to
+ * @return the builder that was passed in
+ */
+ protected StringBuilder appendTo(StringBuilder builder) {
+ return jobId.appendTo(builder).
+ append(SEPARATOR).
+ append(isMap ? 'm' : 'r').
+ append(SEPARATOR).
+ append(idFormat.format(id));
}
@Override
public int hashCode() {
- return toStringWOPrefix().toString().hashCode();
+ return jobId.hashCode() * 524287 + id;
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
- this.jobId = JobID.read(in);
- this.isMap = in.readBoolean();
+ jobId.readFields(in);
+ isMap = in.readBoolean();
}
@Override
@@ -152,12 +156,6 @@
out.writeBoolean(isMap);
}
- public static TaskID read(DataInput in) throws IOException {
- TaskID tipId = new TaskID();
- tipId.readFields(in);
- return tipId;
- }
-
/** Construct a TaskID object from given string
* @return constructed TaskID object or null if the given String is null
* @throws IllegalArgumentException if the given string is malformed
@@ -174,8 +172,10 @@
if(parts[3].equals("m")) isMap = true;
else if(parts[3].equals("r")) isMap = false;
else throw new Exception();
- return new TaskID(parts[1], Integer.parseInt(parts[2]),
- isMap, Integer.parseInt(parts[4]));
+ return new org.apache.hadoop.mapred.TaskID(parts[1],
+ Integer.parseInt(parts[2]),
+ isMap,
+ Integer.parseInt(parts[4]));
}
}
}catch (Exception ex) {//fall below
@@ -184,37 +184,4 @@
+ " is not properly formed");
}
- /**
- * Returns a regex pattern which matches task IDs. Arguments can
- * be given null, in which case that part of the regex will be generic.
- * For example to obtain a regex matching <i>the first map task</i>
- * of <i>any jobtracker</i>, of <i>any job</i>, we would use :
- * <pre>
- * TaskID.getTaskIDsPattern(null, null, true, 1);
- * </pre>
- * which will return :
- * <pre> "task_[^_]*_[0-9]*_m_000001*" </pre>
- * @param jtIdentifier jobTracker identifier, or null
- * @param jobId job number, or null
- * @param isMap whether the tip is a map, or null
- * @param taskId taskId number, or null
- * @return a regex pattern matching TaskIDs
- */
- public static String getTaskIDsPattern(String jtIdentifier, Integer jobId
- , Boolean isMap, Integer taskId) {
- StringBuilder builder = new StringBuilder(TASK).append(UNDERSCORE)
- .append(getTaskIDsPatternWOPrefix(jtIdentifier, jobId, isMap, taskId));
- return builder.toString();
- }
-
- static StringBuilder getTaskIDsPatternWOPrefix(String jtIdentifier
- , Integer jobId, Boolean isMap, Integer taskId) {
- StringBuilder builder = new StringBuilder();
- builder.append(JobID.getJobIDsPatternWOPrefix(jtIdentifier, jobId))
- .append(UNDERSCORE)
- .append(isMap != null ? (isMap ? "m" : "r") : "(m|r)").append(UNDERSCORE)
- .append(taskId != null ? idFormat.format(taskId) : "[0-9]*");
- return builder;
- }
-
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskInputOutputContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskInputOutputContext.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskInputOutputContext.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/TaskInputOutputContext.java Mon Dec 15 14:21:32 2008
@@ -21,6 +21,7 @@
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
/**
* A context object that allows input and output from the task. It is only
@@ -31,34 +32,73 @@
* @param <VALUEOUT> the output value type for the task
*/
public abstract class TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
- extends TaskAttemptContext {
-
- public TaskInputOutputContext(Configuration conf, TaskAttemptID taskid) {
+ extends TaskAttemptContext implements Progressable {
+ private RecordWriter<KEYOUT,VALUEOUT> output;
+ private StatusReporter reporter;
+ private OutputCommitter committer;
+
+ public TaskInputOutputContext(Configuration conf, TaskAttemptID taskid,
+ RecordWriter<KEYOUT,VALUEOUT> output,
+ OutputCommitter committer,
+ StatusReporter reporter) {
super(conf, taskid);
+ this.output = output;
+ this.reporter = reporter;
+ this.committer = committer;
}
/**
- * Advance to the next key, returning null if at end.
- * @param key the key object to read in to, which may be null
- * @return the key object that was read into
+ * Advance to the next key, value pair, returning null if at end.
+ * @return the key object that was read into, or null if no more
*/
- public abstract KEYIN nextKey(KEYIN key
- ) throws IOException, InterruptedException;
-
+ public abstract
+ boolean nextKeyValue() throws IOException, InterruptedException;
+
/**
- * Read the next value. Must be called after nextKey.
- * @param value the value object to read in to, which may be null
+ * Get the current key.
+ * @return the current key object or null if there isn't one
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract
+ KEYIN getCurrentKey() throws IOException, InterruptedException;
+
+ /**
+ * Get the current value.
* @return the value object that was read into
* @throws IOException
* @throws InterruptedException
*/
- public abstract VALUEIN nextValue(VALUEIN value
- ) throws IOException, InterruptedException;
+ public abstract VALUEIN getCurrentValue() throws IOException,
+ InterruptedException;
/**
* Generate an output key/value pair.
*/
- public abstract void collect(KEYOUT key, VALUEOUT value
- ) throws IOException, InterruptedException;
+ public void write(KEYOUT key, VALUEOUT value
+ ) throws IOException, InterruptedException {
+ output.write(key, value);
+ }
+ public Counter getCounter(Enum<?> counterName) {
+ return reporter.getCounter(counterName);
+ }
+
+ public Counter getCounter(String groupName, String counterName) {
+ return reporter.getCounter(groupName, counterName);
+ }
+
+ @Override
+ public void progress() {
+ reporter.progress();
+ }
+
+ @Override
+ public void setStatus(String status) {
+ reporter.setStatus(status);
+ }
+
+ public OutputCommitter getOutputCommitter() {
+ return committer;
+ }
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java Mon Dec 15 14:21:32 2008
@@ -32,6 +32,7 @@
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.ReflectionUtils;
@@ -108,26 +109,37 @@
/**
* Set a PathFilter to be applied to the input paths for the map-reduce job.
- *
+ * @param job the job to modify
* @param filter the PathFilter class use for filtering the input paths.
*/
- public static void setInputPathFilter(Configuration conf,
+ public static void setInputPathFilter(Job job,
Class<? extends PathFilter> filter) {
- conf.setClass("mapred.input.pathFilter.class", filter, PathFilter.class);
+ job.getConfiguration().setClass("mapred.input.pathFilter.class", filter,
+ PathFilter.class);
}
- public static void setMinInputSplitSize(Configuration conf,
+ /**
+ * Set the minimum input split size
+ * @param job the job to modify
+ * @param size the minimum size
+ */
+ public static void setMinInputSplitSize(Job job,
long size) {
- conf.setLong("mapred.min.split.size", size);
+ job.getConfiguration().setLong("mapred.min.split.size", size);
}
public static long getMinSplitSize(Configuration conf) {
return conf.getLong("mapred.min.split.size", 1L);
}
- public static void setMaxInputSplitSize(Configuration conf,
+ /**
+ * Set the maximum split size
+ * @param job the job to modify
+ * @param size the maximum split size
+ */
+ public static void setMaxInputSplitSize(Job job,
long size) {
- conf.setLong("mapred.max.split.size", size);
+ job.getConfiguration().setLong("mapred.max.split.size", size);
}
public static long getMaxSplitSize(Configuration conf) {
@@ -271,14 +283,14 @@
* Sets the given comma separated paths as the list of inputs
* for the map-reduce job.
*
- * @param conf Configuration of the job
+ * @param job the job
* @param commaSeparatedPaths Comma separated paths to be set as
* the list of inputs for the map-reduce job.
*/
- public static void setInputPaths(Configuration conf,
+ public static void setInputPaths(Job job,
String commaSeparatedPaths
) throws IOException {
- setInputPaths(conf, StringUtils.stringToPath(
+ setInputPaths(job, StringUtils.stringToPath(
getPathStrings(commaSeparatedPaths)));
}
@@ -286,15 +298,15 @@
* Add the given comma separated paths to the list of inputs for
* the map-reduce job.
*
- * @param conf The configuration of the job
+ * @param job The job to modify
* @param commaSeparatedPaths Comma separated paths to be added to
* the list of inputs for the map-reduce job.
*/
- public static void addInputPaths(Configuration conf,
+ public static void addInputPaths(Job job,
String commaSeparatedPaths
) throws IOException {
for (String str : getPathStrings(commaSeparatedPaths)) {
- addInputPath(conf, new Path(str));
+ addInputPath(job, new Path(str));
}
}
@@ -302,12 +314,13 @@
* Set the array of {@link Path}s as the list of inputs
* for the map-reduce job.
*
- * @param conf Configuration of the job.
+ * @param job The job to modify
* @param inputPaths the {@link Path}s of the input directories/files
* for the map-reduce job.
*/
- public static void setInputPaths(Configuration conf,
+ public static void setInputPaths(Job job,
Path... inputPaths) throws IOException {
+ Configuration conf = job.getConfiguration();
FileSystem fs = FileSystem.get(conf);
Path path = inputPaths[0].makeQualified(fs);
StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
@@ -322,12 +335,13 @@
/**
* Add a {@link Path} to the list of inputs for the map-reduce job.
*
- * @param conf The configuration of the job
+ * @param job The {@link Job} to modify
* @param path {@link Path} to be added to the list of inputs for
* the map-reduce job.
*/
- public static void addInputPath(Configuration conf,
+ public static void addInputPath(Job job,
Path path) throws IOException {
+ Configuration conf = job.getConfiguration();
FileSystem fs = FileSystem.get(conf);
path = path.makeQualified(fs);
String dirStr = StringUtils.escapeString(path.toString());
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java Mon Dec 15 14:21:32 2008
@@ -27,6 +27,7 @@
* by one.
*/
public class InvalidInputException extends IOException {
+ private static final long serialVersionUID = -380668190578456802L;
private List<IOException> problems;
/**
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java?rev=726850&r1=726849&r2=726850&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/input/LineRecordReader.java Mon Dec 15 14:21:32 2008
@@ -46,7 +46,9 @@
private long pos;
private long end;
private LineReader in;
- int maxLineLength;
+ private int maxLineLength;
+ private LongWritable key = null;
+ private Text value = null;
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
@@ -82,25 +84,21 @@
this.pos = start;
}
- public LongWritable nextKey(LongWritable key) throws IOException {
+ public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
- return key;
- }
-
- public Text nextValue(Text value) throws IOException {
if (value == null) {
value = new Text();
}
+ int newSize = 0;
while (pos < end) {
- int newSize = in.readLine(value, maxLineLength,
- Math.max((int)Math.min(Integer.MAX_VALUE,
- end-pos),
- maxLineLength));
+ newSize = in.readLine(value, maxLineLength,
+ Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
+ maxLineLength));
if (newSize == 0) {
- return null;
+ break;
}
pos += newSize;
if (newSize < maxLineLength) {
@@ -111,6 +109,22 @@
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
+ if (newSize == 0) {
+ key = null;
+ value = null;
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ @Override
+ public LongWritable getCurrentKey() {
+ return key;
+ }
+
+ @Override
+ public Text getCurrentValue() {
return value;
}