You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:11 UTC

[04/50] [abbrv] git commit: TEZ-472. Various fixes including NPE in shuffle when run MR jobs using mapred apis. (hitesh)

TEZ-472. Various fixes including NPE in shuffle when run MR jobs using mapred apis. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/9a7e7bcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/9a7e7bcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/9a7e7bcc

Branch: refs/heads/master
Commit: 9a7e7bccb3cdf1989945092c95c3b8e2a216946d
Parents: eb0f6ff
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Sep 20 16:12:53 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Sep 20 16:12:53 2013 -0700

----------------------------------------------------------------------
 .../engine/lib/input/ShuffledMergedInput.java   |   2 +-
 .../tez/mapreduce/examples/ExampleDriver.java   |   3 +
 .../tez/mapreduce/examples/MapredWordCount.java | 163 +++++++++++++++++++
 .../tez/mapreduce/examples/WordCount.java       |  14 +-
 .../tez/mapreduce/newoutput/SimpleOutput.java   |  23 ++-
 .../org/apache/tez/mapreduce/DAGJobStatus.java  |  12 +-
 6 files changed, 199 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9a7e7bcc/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
index 91bb6d5..eccd119 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
@@ -122,7 +122,7 @@ public class ShuffledMergedInput implements LogicalInput {
    */
   @Override
   public KVReader getReader() throws IOException {
-    if (rawIter != null) {
+    if (rawIter == null) {
       try {
         waitForInputReady();
       } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9a7e7bcc/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index c9691ff..6c062a4 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -41,6 +41,9 @@ public class ExampleDriver {
     try {
       pgd.addClass("wordcount", WordCount.class,
           "A map/reduce program that counts the words in the input files.");
+      pgd.addClass("mapredwordcount", MapredWordCount.class,
+          "A map/reduce program that counts the words in the input files"
+         + " using the mapred apis.");
       pgd.addClass("wordcountmrrtest", WordCountMRRTest.class,
           "A map/reduce program that counts the words in the input files."
           + " Map splits on spaces. First reduce splits on \".\"");

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9a7e7bcc/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MapredWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MapredWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MapredWordCount.java
new file mode 100644
index 0000000..33aad89
--- /dev/null
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MapredWordCount.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.examples;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * This is an example Hadoop Map/Reduce application using the mapred apis.
+ * It reads the text input files, breaks each line into words
+ * and counts them. The output is a locally sorted list of words and the
+ * count of how often they occurred.
+ *
+ * To run: bin/hadoop jar examples.jar wordcount
+ *            [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dir</i> <i>out-dir</i>
+ */
+public class MapredWordCount extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(MapredWordCount.class);
+
+  /**
+   * Counts the words in each line.
+   * For each line of input, break the line into words and emit them as
+   * (<b>word</b>, <b>1</b>).
+   */
+  public static class MapClass extends MapReduceBase
+    implements Mapper<LongWritable, Text, Text, IntWritable> {
+
+    private final static IntWritable one = new IntWritable(1);
+    private Text word = new Text();
+
+    public void map(LongWritable key, Text value,
+                    OutputCollector<Text, IntWritable> output,
+                    Reporter reporter) throws IOException {
+      String line = value.toString();
+      StringTokenizer itr = new StringTokenizer(line);
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        output.collect(word, one);
+      }
+    }
+  }
+
+  /**
+   * A reducer class that just emits the sum of the input values.
+   */
+  public static class Reduce extends MapReduceBase
+    implements Reducer<Text, IntWritable, Text, IntWritable> {
+
+    public void reduce(Text key, Iterator<IntWritable> values,
+                       OutputCollector<Text, IntWritable> output,
+                       Reporter reporter) throws IOException {
+      int sum = 0;
+      while (values.hasNext()) {
+        sum += values.next().get();
+      }
+      output.collect(key, new IntWritable(sum));
+    }
+  }
+
+  static int printUsage() {
+    System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");
+    ToolRunner.printGenericCommandUsage(System.out);
+    return -1;
+  }
+
+  /**
+   * The main driver for word count map/reduce program.
+   * Invoke this method to submit the map/reduce job.
+   * @throws IOException When there is communication problems with the
+   *                     job tracker.
+   */
+  public int run(String[] args) throws Exception {
+    JobConf conf = new JobConf(getConf(), WordCount.class);
+    conf.setJobName("wordcount");
+    LOG.info("Running WordCount job using mapred apis");
+
+    // the keys are words (strings)
+    conf.setOutputKeyClass(Text.class);
+    // the values are counts (ints)
+    conf.setOutputValueClass(IntWritable.class);
+
+    conf.setMapperClass(MapClass.class);
+    conf.setCombinerClass(Reduce.class);
+    conf.setReducerClass(Reduce.class);
+
+    List<String> other_args = new ArrayList<String>();
+    for(int i=0; i < args.length; ++i) {
+      try {
+        if ("-m".equals(args[i])) {
+          conf.setNumMapTasks(Integer.parseInt(args[++i]));
+        } else if ("-r".equals(args[i])) {
+          conf.setNumReduceTasks(Integer.parseInt(args[++i]));
+        } else {
+          other_args.add(args[i]);
+        }
+      } catch (NumberFormatException except) {
+        LOG.error("Integer expected instead of " + args[i]);
+        return printUsage();
+      } catch (ArrayIndexOutOfBoundsException except) {
+        LOG.error("Required parameter missing from " + args[i-1]);
+        return printUsage();
+      }
+    }
+    // Make sure there are exactly 2 parameters left.
+    if (other_args.size() != 2) {
+      LOG.error("Wrong number of parameters: " +
+          other_args.size() + " instead of 2.");
+      return printUsage();
+    }
+    FileInputFormat.setInputPaths(conf, other_args.get(0));
+    FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
+
+    JobClient.runJob(conf);
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(),
+        new MapredWordCount(), args);
+    System.exit(res);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9a7e7bcc/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index 5d02201..fc1103e 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.tez.mapreduce.examples;
 
 import java.io.IOException;
@@ -32,14 +33,13 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
 
 public class WordCount {
-  
 
-  public static class TokenizerMapper 
+  public static class TokenizerMapper
        extends Mapper<Object, Text, Text, IntWritable>{
-    
+
     private final static IntWritable one = new IntWritable(1);
     private Text word = new Text();
-      
+
     public void map(Object key, Text value, Context context
                     ) throws IOException, InterruptedException {
       StringTokenizer itr = new StringTokenizer(value.toString());
@@ -49,12 +49,12 @@ public class WordCount {
       }
     }
   }
-  
-  public static class IntSumReducer 
+
+  public static class IntSumReducer
        extends Reducer<Text,IntWritable,Text,IntWritable> {
     private IntWritable result = new IntWritable();
 
-    public void reduce(Text key, Iterable<IntWritable> values, 
+    public void reduce(Text key, Iterable<IntWritable> values,
                        Context context
                        ) throws IOException, InterruptedException {
       int sum = 0;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9a7e7bcc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
index d00ffc0..1dd94e1 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
@@ -14,6 +14,7 @@ import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.mapred.FileOutputCommitter;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -22,7 +23,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
@@ -129,6 +129,12 @@ public class SimpleOutput implements LogicalOutput {
           (isMapperOutput ? TaskType.MAP : TaskType.REDUCE),
           outputContext.getTaskIndex()),
           outputContext.getTaskAttemptNumber());
+      jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
+      jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
+      jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
+      jobConf.setInt(JobContext.TASK_PARTITION,
+          taskAttemptId.getTaskID().getId());
+      jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
 
       oldApiTaskAttemptContext =
           new org.apache.tez.mapreduce.hadoop.newmapred.TaskAttemptContextImpl(
@@ -137,7 +143,8 @@ public class SimpleOutput implements LogicalOutput {
       oldOutputFormat = jobConf.getOutputFormat();
 
       List<Statistics> matchedStats = null;
-      if (oldOutputFormat instanceof org.apache.hadoop.mapred.FileOutputFormat) {
+      if (oldOutputFormat
+          instanceof org.apache.hadoop.mapred.FileOutputFormat) {
         matchedStats =
             Utils.getFsStatistics(
                 org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(
@@ -194,11 +201,19 @@ public class SimpleOutput implements LogicalOutput {
         FileOutputFormat.setWorkOutputPath(job, outputPath);
       }
     }
-    this.committer.setupTask(newApiTaskAttemptContext);
+    if (useNewApi) {
+      this.committer.setupTask(newApiTaskAttemptContext);
+    } else {
+      this.committer.setupTask(oldApiTaskAttemptContext);
+    }
   }
 
   public boolean isCommitRequired() throws IOException {
-    return committer.needsTaskCommit(newApiTaskAttemptContext);
+    if (useNewApi) {
+      return committer.needsTaskCommit(newApiTaskAttemptContext);
+    } else {
+      return committer.needsTaskCommit(oldApiTaskAttemptContext);
+    }
   }
 
   private TaskAttemptContext createTaskAttemptContext() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/9a7e7bcc/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
index 1b264c0..0b768c0 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
@@ -42,14 +42,14 @@ public class DAGJobStatus extends JobStatus {
   private final String jobFile;
   private final DAGStatus dagStatus;
   private final ApplicationReport report;
-  
+
   public DAGJobStatus(ApplicationReport report, DAGStatus dagStatus, String jobFile) {
     super();
     this.dagStatus = dagStatus;
     this.jobFile = jobFile;
     this.report = report;
   }
-  
+
   @Override
   protected synchronized void setMapProgress(float p) {
     throw new UnsupportedOperationException();
@@ -149,7 +149,7 @@ public class DAGJobStatus extends JobStatus {
   @Override
   public synchronized float getCleanupProgress() {
     if (dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
-        dagStatus.getState() == DAGStatus.State.FAILED || 
+        dagStatus.getState() == DAGStatus.State.FAILED ||
         dagStatus.getState() == DAGStatus.State.KILLED ||
         dagStatus.getState() == DAGStatus.State.ERROR) {
       return 1.0f;
@@ -237,7 +237,7 @@ public class DAGJobStatus extends JobStatus {
   @Override
   public synchronized boolean isJobComplete() {
     return (dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
-        dagStatus.getState() == DAGStatus.State.FAILED || 
+        dagStatus.getState() == DAGStatus.State.FAILED ||
         dagStatus.getState() == DAGStatus.State.KILLED ||
         dagStatus.getState() == DAGStatus.State.ERROR);
   }
@@ -369,7 +369,7 @@ public class DAGJobStatus extends JobStatus {
     buffer.append("needed-mem" + getNeededMem());
     return buffer.toString();
   }
-  
+
   private float getProgress(String vertexName) {
     Progress progress = dagStatus.getVertexProgress().get(vertexName);
     if(progress == null) {
@@ -380,7 +380,7 @@ public class DAGJobStatus extends JobStatus {
     if(totalTasks != 0) {
       return progress.getSucceededTaskCount()/totalTasks;
     }
-    return 1;
+    return 0;
   }
 
 }