You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:11 UTC
[04/50] [abbrv] git commit: TEZ-472. Various fixes including NPE in
shuffle when run MR jobs using mapred apis. (hitesh)
TEZ-472. Various fixes including NPE in shuffle when run MR jobs using mapred apis. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/9a7e7bcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/9a7e7bcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/9a7e7bcc
Branch: refs/heads/master
Commit: 9a7e7bccb3cdf1989945092c95c3b8e2a216946d
Parents: eb0f6ff
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Sep 20 16:12:53 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Sep 20 16:12:53 2013 -0700
----------------------------------------------------------------------
.../engine/lib/input/ShuffledMergedInput.java | 2 +-
.../tez/mapreduce/examples/ExampleDriver.java | 3 +
.../tez/mapreduce/examples/MapredWordCount.java | 163 +++++++++++++++++++
.../tez/mapreduce/examples/WordCount.java | 14 +-
.../tez/mapreduce/newoutput/SimpleOutput.java | 23 ++-
.../org/apache/tez/mapreduce/DAGJobStatus.java | 12 +-
6 files changed, 199 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9a7e7bcc/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
index 91bb6d5..eccd119 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
@@ -122,7 +122,7 @@ public class ShuffledMergedInput implements LogicalInput {
*/
@Override
public KVReader getReader() throws IOException {
- if (rawIter != null) {
+ if (rawIter == null) {
try {
waitForInputReady();
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9a7e7bcc/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index c9691ff..6c062a4 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -41,6 +41,9 @@ public class ExampleDriver {
try {
pgd.addClass("wordcount", WordCount.class,
"A map/reduce program that counts the words in the input files.");
+ pgd.addClass("mapredwordcount", MapredWordCount.class,
+ "A map/reduce program that counts the words in the input files"
+ + " using the mapred apis.");
pgd.addClass("wordcountmrrtest", WordCountMRRTest.class,
"A map/reduce program that counts the words in the input files."
+ " Map splits on spaces. First reduce splits on \".\"");
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9a7e7bcc/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MapredWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MapredWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MapredWordCount.java
new file mode 100644
index 0000000..33aad89
--- /dev/null
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MapredWordCount.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * This is an example Hadoop Map/Reduce application using the mapred apis.
+ * It reads the text input files, breaks each line into words
+ * and counts them. The output is a locally sorted list of words and the
+ * count of how often they occurred.
+ *
+ * To run: bin/hadoop jar examples.jar wordcount
+ * [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dir</i> <i>out-dir</i>
+ */
+public class MapredWordCount extends Configured implements Tool {
+
+ private static final Log LOG = LogFactory.getLog(MapredWordCount.class);
+
+ /**
+ * Counts the words in each line.
+ * For each line of input, break the line into words and emit them as
+ * (<b>word</b>, <b>1</b>).
+ */
+ public static class MapClass extends MapReduceBase
+ implements Mapper<LongWritable, Text, Text, IntWritable> {
+
+ private final static IntWritable one = new IntWritable(1);
+ private Text word = new Text();
+
+ public void map(LongWritable key, Text value,
+ OutputCollector<Text, IntWritable> output,
+ Reporter reporter) throws IOException {
+ String line = value.toString();
+ StringTokenizer itr = new StringTokenizer(line);
+ while (itr.hasMoreTokens()) {
+ word.set(itr.nextToken());
+ output.collect(word, one);
+ }
+ }
+ }
+
+ /**
+ * A reducer class that just emits the sum of the input values.
+ */
+ public static class Reduce extends MapReduceBase
+ implements Reducer<Text, IntWritable, Text, IntWritable> {
+
+ public void reduce(Text key, Iterator<IntWritable> values,
+ OutputCollector<Text, IntWritable> output,
+ Reporter reporter) throws IOException {
+ int sum = 0;
+ while (values.hasNext()) {
+ sum += values.next().get();
+ }
+ output.collect(key, new IntWritable(sum));
+ }
+ }
+
+ static int printUsage() {
+ System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");
+ ToolRunner.printGenericCommandUsage(System.out);
+ return -1;
+ }
+
+ /**
+ * The main driver for word count map/reduce program.
+ * Invoke this method to submit the map/reduce job.
+ * @throws IOException When there is communication problems with the
+ * job tracker.
+ */
+ public int run(String[] args) throws Exception {
+ JobConf conf = new JobConf(getConf(), WordCount.class);
+ conf.setJobName("wordcount");
+ LOG.info("Running WordCount job using mapred apis");
+
+ // the keys are words (strings)
+ conf.setOutputKeyClass(Text.class);
+ // the values are counts (ints)
+ conf.setOutputValueClass(IntWritable.class);
+
+ conf.setMapperClass(MapClass.class);
+ conf.setCombinerClass(Reduce.class);
+ conf.setReducerClass(Reduce.class);
+
+ List<String> other_args = new ArrayList<String>();
+ for(int i=0; i < args.length; ++i) {
+ try {
+ if ("-m".equals(args[i])) {
+ conf.setNumMapTasks(Integer.parseInt(args[++i]));
+ } else if ("-r".equals(args[i])) {
+ conf.setNumReduceTasks(Integer.parseInt(args[++i]));
+ } else {
+ other_args.add(args[i]);
+ }
+ } catch (NumberFormatException except) {
+ LOG.error("Integer expected instead of " + args[i]);
+ return printUsage();
+ } catch (ArrayIndexOutOfBoundsException except) {
+ LOG.error("Required parameter missing from " + args[i-1]);
+ return printUsage();
+ }
+ }
+ // Make sure there are exactly 2 parameters left.
+ if (other_args.size() != 2) {
+ LOG.error("Wrong number of parameters: " +
+ other_args.size() + " instead of 2.");
+ return printUsage();
+ }
+ FileInputFormat.setInputPaths(conf, other_args.get(0));
+ FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
+
+ JobClient.runJob(conf);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(),
+ new MapredWordCount(), args);
+ System.exit(res);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9a7e7bcc/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index 5d02201..fc1103e 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.tez.mapreduce.examples;
import java.io.IOException;
@@ -32,14 +33,13 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
-
- public static class TokenizerMapper
+ public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
-
+
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
-
+
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
@@ -49,12 +49,12 @@ public class WordCount {
}
}
}
-
- public static class IntSumReducer
+
+ public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
- public void reduce(Text key, Iterable<IntWritable> values,
+ public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9a7e7bcc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
index d00ffc0..1dd94e1 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
@@ -14,6 +14,7 @@ import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -22,7 +23,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
@@ -129,6 +129,12 @@ public class SimpleOutput implements LogicalOutput {
(isMapperOutput ? TaskType.MAP : TaskType.REDUCE),
outputContext.getTaskIndex()),
outputContext.getTaskAttemptNumber());
+ jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
+ jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
+ jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
+ jobConf.setInt(JobContext.TASK_PARTITION,
+ taskAttemptId.getTaskID().getId());
+ jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
oldApiTaskAttemptContext =
new org.apache.tez.mapreduce.hadoop.newmapred.TaskAttemptContextImpl(
@@ -137,7 +143,8 @@ public class SimpleOutput implements LogicalOutput {
oldOutputFormat = jobConf.getOutputFormat();
List<Statistics> matchedStats = null;
- if (oldOutputFormat instanceof org.apache.hadoop.mapred.FileOutputFormat) {
+ if (oldOutputFormat
+ instanceof org.apache.hadoop.mapred.FileOutputFormat) {
matchedStats =
Utils.getFsStatistics(
org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(
@@ -194,11 +201,19 @@ public class SimpleOutput implements LogicalOutput {
FileOutputFormat.setWorkOutputPath(job, outputPath);
}
}
- this.committer.setupTask(newApiTaskAttemptContext);
+ if (useNewApi) {
+ this.committer.setupTask(newApiTaskAttemptContext);
+ } else {
+ this.committer.setupTask(oldApiTaskAttemptContext);
+ }
}
public boolean isCommitRequired() throws IOException {
- return committer.needsTaskCommit(newApiTaskAttemptContext);
+ if (useNewApi) {
+ return committer.needsTaskCommit(newApiTaskAttemptContext);
+ } else {
+ return committer.needsTaskCommit(oldApiTaskAttemptContext);
+ }
}
private TaskAttemptContext createTaskAttemptContext() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9a7e7bcc/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
index 1b264c0..0b768c0 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
@@ -42,14 +42,14 @@ public class DAGJobStatus extends JobStatus {
private final String jobFile;
private final DAGStatus dagStatus;
private final ApplicationReport report;
-
+
public DAGJobStatus(ApplicationReport report, DAGStatus dagStatus, String jobFile) {
super();
this.dagStatus = dagStatus;
this.jobFile = jobFile;
this.report = report;
}
-
+
@Override
protected synchronized void setMapProgress(float p) {
throw new UnsupportedOperationException();
@@ -149,7 +149,7 @@ public class DAGJobStatus extends JobStatus {
@Override
public synchronized float getCleanupProgress() {
if (dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
- dagStatus.getState() == DAGStatus.State.FAILED ||
+ dagStatus.getState() == DAGStatus.State.FAILED ||
dagStatus.getState() == DAGStatus.State.KILLED ||
dagStatus.getState() == DAGStatus.State.ERROR) {
return 1.0f;
@@ -237,7 +237,7 @@ public class DAGJobStatus extends JobStatus {
@Override
public synchronized boolean isJobComplete() {
return (dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
- dagStatus.getState() == DAGStatus.State.FAILED ||
+ dagStatus.getState() == DAGStatus.State.FAILED ||
dagStatus.getState() == DAGStatus.State.KILLED ||
dagStatus.getState() == DAGStatus.State.ERROR);
}
@@ -369,7 +369,7 @@ public class DAGJobStatus extends JobStatus {
buffer.append("needed-mem" + getNeededMem());
return buffer.toString();
}
-
+
private float getProgress(String vertexName) {
Progress progress = dagStatus.getVertexProgress().get(vertexName);
if(progress == null) {
@@ -380,7 +380,7 @@ public class DAGJobStatus extends JobStatus {
if(totalTasks != 0) {
return progress.getSucceededTaskCount()/totalTasks;
}
- return 1;
+ return 0;
}
}