You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/12/28 18:13:21 UTC

git commit: TEZ-689. Write wordcount in Tez API (bikas)

Updated Branches:
  refs/heads/master 97e9a5ef3 -> 0134ebaa7


TEZ-689. Write wordcount in Tez API (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/0134ebaa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/0134ebaa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/0134ebaa

Branch: refs/heads/master
Commit: 0134ebaa780beedb9fc218fa13ec911cc73e8e17
Parents: 97e9a5e
Author: Bikas Saha <bi...@apache.org>
Authored: Sat Dec 28 09:12:47 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Sat Dec 28 09:12:47 2013 -0800

----------------------------------------------------------------------
 .../tez/mapreduce/examples/WordCount.java       | 385 +++++++++++++++++--
 1 file changed, 345 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/0134ebaa/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index fc1103e..bb48e3f 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -15,73 +15,378 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.tez.mapreduce.examples;
 
 import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.StringTokenizer;
+import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+
+import com.google.common.base.Preconditions;
 
 public class WordCount {
+  public static class TokenProcessor implements LogicalIOProcessor {
+    TezProcessorContext context;
+    IntWritable one = new IntWritable(1);
+    Text word = new Text();
 
-  public static class TokenizerMapper
-       extends Mapper<Object, Text, Text, IntWritable>{
+    @Override
+    public void initialize(TezProcessorContext processorContext)
+        throws Exception {
+      this.context = processorContext;
+    }
+
+    @Override
+    public void handleEvents(List<Event> processorEvents) {
+    }
 
-    private final static IntWritable one = new IntWritable(1);
-    private Text word = new Text();
+    @Override
+    public void close() throws Exception {
+    }
 
-    public void map(Object key, Text value, Context context
-                    ) throws IOException, InterruptedException {
-      StringTokenizer itr = new StringTokenizer(value.toString());
-      while (itr.hasMoreTokens()) {
-        word.set(itr.nextToken());
-        context.write(word, one);
+    @Override
+    public void run(Map<String, LogicalInput> inputs,
+        Map<String, LogicalOutput> outputs) throws Exception {
+      Preconditions.checkArgument(inputs.size() == 1);
+      Preconditions.checkArgument(outputs.size() == 1);
+      MRInput input = (MRInput) inputs.values().iterator().next();
+      KeyValueReader kvReader = input.getReader();
+      OnFileSortedOutput output = (OnFileSortedOutput) outputs.values().iterator().next();
+      KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
+      while (kvReader.next()) {
+        StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
+        while (itr.hasMoreTokens()) {
+          word.set(itr.nextToken());
+          kvWriter.write(word, one);
+        }
       }
     }
+    
   }
+  
+  public static class SumProcessor implements LogicalIOProcessor {
+    TezProcessorContext context;
+    
+    @Override
+    public void initialize(TezProcessorContext processorContext)
+        throws Exception {
+      this.context = processorContext;
+    }
 
-  public static class IntSumReducer
-       extends Reducer<Text,IntWritable,Text,IntWritable> {
-    private IntWritable result = new IntWritable();
+    @Override
+    public void handleEvents(List<Event> processorEvents) {
+    }
+
+    @Override
+    public void close() throws Exception {
+    }
 
-    public void reduce(Text key, Iterable<IntWritable> values,
-                       Context context
-                       ) throws IOException, InterruptedException {
-      int sum = 0;
-      for (IntWritable val : values) {
-        sum += val.get();
+    @Override
+    public void run(Map<String, LogicalInput> inputs,
+        Map<String, LogicalOutput> outputs) throws Exception {
+      Preconditions.checkArgument(inputs.size() == 1);
+      MROutput out = (MROutput) outputs.values().iterator().next();
+      KeyValueWriter kvWriter = out.getWriter();
+      KeyValuesReader kvReader = (KeyValuesReader) inputs.values().iterator().next().getReader();
+      while (kvReader.next()) {
+        Text word = (Text) kvReader.getCurrentKey();
+        int sum = 0;
+        for (Object value : kvReader.getCurrentValues()) {
+          sum += ((IntWritable) value).get();
+        }
+        kvWriter.write(word, new IntWritable(sum));
+      }
+      if (out.isCommitRequired()) {
+        while (!context.canCommit()) {
+          Thread.sleep(100);
+        }
+        out.commit();
       }
-      result.set(sum);
-      context.write(key, result);
+    }
+    
+  }
+  
+  private DAG createDAG(FileSystem fs, TezConfiguration tezConf,
+      Map<String, LocalResource> localResources, Path stagingDir,
+      String inputPath, String outputPath) throws IOException {
+    Configuration mapStageConf = new JobConf((Configuration)tezConf);
+    mapStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        Text.class.getName());
+    mapStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        IntWritable.class.getName());
+    mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, 
+        TezGroupedSplitsInputFormat.class.getName());
+
+    mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
+    mapStageConf.setBoolean("mapred.mapper.new-api", true);
+
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(mapStageConf,
+        null);
+
+    Configuration finalReduceConf = new JobConf((Configuration)tezConf);
+    finalReduceConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        Text.class.getName());
+    finalReduceConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        IntWritable.class.getName());
+    finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
+        TextOutputFormat.class.getName());
+    finalReduceConf.set(FileOutputFormat.OUTDIR, outputPath);
+    finalReduceConf.setBoolean("mapred.mapper.new-api", true);
+
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf,
+        mapStageConf);
+
+    MRHelpers.doJobClientMagic(mapStageConf);
+    MRHelpers.doJobClientMagic(finalReduceConf);
+
+    byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
+    byte[] mapInputPayload = MRHelpers.createMRInputPayloadWithGrouping(mapPayload, 
+            TextInputFormat.class.getName());
+    int numMaps = -1;
+    Vertex tokenizerVertex = new Vertex("tokenizer", new ProcessorDescriptor(
+        TokenProcessor.class.getName()),
+        numMaps, MRHelpers.getMapResource(mapStageConf));
+    tokenizerVertex.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
+    Map<String, String> mapEnv = new HashMap<String, String>();
+    MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
+    tokenizerVertex.setTaskEnvironment(mapEnv);
+    Class<? extends TezRootInputInitializer> initializerClazz = MRInputAMSplitGenerator.class;
+    InputDescriptor id = new InputDescriptor(MRInput.class.getName()).
+        setUserPayload(mapInputPayload);
+    tokenizerVertex.addInput("MRInput", id, initializerClazz);
+
+    byte[] finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
+    Vertex summerVertex = new Vertex("summer",
+        new ProcessorDescriptor(
+            SumProcessor.class.getName()).setUserPayload(finalReducePayload),
+                1, MRHelpers.getReduceResource(finalReduceConf));
+    summerVertex.setJavaOpts(
+        MRHelpers.getReduceJavaOpts(finalReduceConf));
+    Map<String, String> reduceEnv = new HashMap<String, String>();
+    MRHelpers.updateEnvironmentForMRTasks(finalReduceConf, reduceEnv, false);
+    summerVertex.setTaskEnvironment(reduceEnv);
+    OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
+        .setUserPayload(finalReducePayload);
+    summerVertex.addOutput("MROutput", od);
+    
+    DAG dag = new DAG("WordCount");
+    dag.addVertex(tokenizerVertex)
+        .addVertex(summerVertex)
+        .addEdge(
+            new Edge(tokenizerVertex, summerVertex, new EdgeProperty(
+                DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+                SchedulingType.SEQUENTIAL, 
+                new OutputDescriptor(OnFileSortedOutput.class.getName())
+                        .setUserPayload(mapPayload), 
+                new InputDescriptor(ShuffledMergedInput.class.getName())
+                        .setUserPayload(finalReducePayload))));
+    return dag;  
+  }
+
+  private static void waitForTezSessionReady(TezSession tezSession)
+      throws IOException, TezException {
+      while (true) {
+        TezSessionStatus status = tezSession.getSessionStatus();
+        if (status.equals(TezSessionStatus.SHUTDOWN)) {
+          throw new RuntimeException("TezSession has already shutdown");
+        }
+        if (status.equals(TezSessionStatus.READY)) {
+          return;
+        }
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+    }
+
+  private static void printUsage() {
+    System.err.println("Usage: " + " wordcount <in1> <out1>");
+  }
+
+  private Credentials credentials = new Credentials();
+  
+  public boolean run(String inputPath, String outputPath, Configuration conf) throws Exception {
+    System.out.println("Running WordCount");
+    // conf and UGI
+    TezConfiguration tezConf;
+    if (conf != null) {
+      tezConf = new TezConfiguration(conf);
+    } else {
+      tezConf = new TezConfiguration();
+    }
+    UserGroupInformation.setConfiguration(tezConf);
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+
+    TezClient tezClient = new TezClient(tezConf);
+    ApplicationId appId = tezClient.createApplication();
+    
+    // staging dir
+    FileSystem fs = FileSystem.get(tezConf);
+    String stagingDirStr = Path.SEPARATOR + "user" + Path.SEPARATOR
+        + user + Path.SEPARATOR+ ".staging" + Path.SEPARATOR
+        + Path.SEPARATOR + appId.toString();    
+    Path stagingDir = new Path(stagingDirStr);
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
+    stagingDir = fs.makeQualified(stagingDir);
+    
+    // security
+    TokenCache.obtainTokensForNamenodes(credentials, new Path[] {stagingDir}, tezConf);
+    TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
+
+    tezConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
+        MRHelpers.getMRAMJavaOpts(tezConf));
+
+    // No need to add jar containing this class as assumed to be part of
+    // the tez jars.
+
+    // TEZ-674 Obtain tokens based on the Input / Output paths. For now assuming staging dir
+    // is the same filesystem as the one used for Input/Output.
+    
+    TezSession tezSession = null;
+    AMConfiguration amConfig = new AMConfiguration(null,
+        null, tezConf, credentials);
+    
+    TezSessionConfiguration sessionConfig =
+        new TezSessionConfiguration(amConfig, tezConf);
+    tezSession = new TezSession("WordCountSession", appId,
+        sessionConfig);
+    tezSession.start();
+
+    DAGStatus dagStatus = null;
+    DAGClient dagClient = null;
+    String[] vNames = { "tokenizer", "summer" };
+
+    Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
+    try {
+        if (fs.exists(new Path(outputPath))) {
+          throw new FileAlreadyExistsException("Output directory "
+              + outputPath + " already exists");
+        }
+        
+        Map<String, LocalResource> localResources =
+          new TreeMap<String, LocalResource>();
+        
+        DAG dag = createDAG(fs, tezConf, localResources,
+            stagingDir, inputPath, outputPath);
+
+        waitForTezSessionReady(tezSession);
+        dagClient = tezSession.submitDAG(dag);
+        //dagClient = tezClient.submitDAGApplication(dag, amConfig);
+
+        // monitoring
+        while (true) {
+          dagStatus = dagClient.getDAGStatus(statusGetOpts);
+          if(dagStatus.getState() == DAGStatus.State.RUNNING ||
+              dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+              dagStatus.getState() == DAGStatus.State.FAILED ||
+              dagStatus.getState() == DAGStatus.State.KILLED ||
+              dagStatus.getState() == DAGStatus.State.ERROR) {
+            break;
+          }
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException e) {
+            // continue;
+          }
+        }
+
+
+        while (dagStatus.getState() == DAGStatus.State.RUNNING) {
+          try {
+            ExampleDriver.printDAGStatus(dagClient, vNames);
+            try {
+              Thread.sleep(1000);
+            } catch (InterruptedException e) {
+              // continue;
+            }
+            dagStatus = dagClient.getDAGStatus(statusGetOpts);
+          } catch (TezException e) {
+            System.exit(-1);
+          }
+        }
+        ExampleDriver.printDAGStatus(dagClient, vNames,
+            true, true);
+        System.out.println("DAG completed. " + "FinalState=" + dagStatus.getState());
+        if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+          System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics());
+          return false;
+        }
+        return true;
+    } finally {
+      fs.delete(stagingDir, true);
+      tezSession.stop();
     }
   }
 
   public static void main(String[] args) throws Exception {
-    Configuration conf = new Configuration();
-    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
-    if (otherArgs.length != 2) {
-      System.err.println("Usage: wordcount <in> <out>");
+    if ((args.length%2) != 0) {
+      printUsage();
       System.exit(2);
     }
-    Job job = new Job(conf, "word count");
-    job.setJarByClass(WordCount.class);
-    job.setMapperClass(TokenizerMapper.class);
-    job.setCombinerClass(IntSumReducer.class);
-    job.setReducerClass(IntSumReducer.class);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(IntWritable.class);
-    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
-    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
-    System.exit(job.waitForCompletion(true) ? 0 : 1);
+    WordCount job = new WordCount();
+    job.run(args[0], args[1], null);
   }
+
 }