You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/09/18 21:50:03 UTC

[18/25] git commit: TEZ-1581. GroupByOrderByMRRTest no longer functional. (hitesh)

TEZ-1581. GroupByOrderByMRRTest no longer functional. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7d1303fa
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7d1303fa
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7d1303fa

Branch: refs/heads/TEZ-8
Commit: 7d1303fa606e700bd4d4b2a122a73a5badbbc889
Parents: edb841c
Author: Hitesh Shah <hi...@apache.org>
Authored: Fri Sep 12 15:18:05 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Fri Sep 12 15:18:05 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../examples/GroupByOrderByMRRTest.java         | 283 +++++++++++++------
 2 files changed, 205 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/7d1303fa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 59be260..3198323 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,7 @@ ALL CHANGES:
   TEZ-1569. Add tests for preemption
   TEZ-1580. Change TestOrderedWordCount to optionally use MR configs.
   TEZ-1524. Resolve user group information only if ACLs are enabled.
+  TEZ-1581. GroupByOrderByMRRTest no longer functional.
 
 Release 0.5.1: Unreleased
 

http://git-wip-us.apache.org/repos/asf/tez/blob/7d1303fa/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
index 939bea0..393faea 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java
@@ -18,34 +18,54 @@
 
 package org.apache.tez.mapreduce.examples;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.StringTokenizer;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.client.MRTezClient;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.output.MROutputLegacy;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 
 /**
  * Simple example that does a GROUP BY ORDER BY in an MRR job
@@ -94,7 +114,7 @@ public class GroupByOrderByMRRTest extends Configured implements Tool {
     public void map(Object key, Text value, Context context
         ) throws IOException, InterruptedException {
       StringTokenizer itr = new StringTokenizer(value.toString());
-      String empName = "";
+      String empName;
       String deptName = "";
       if (itr.hasMoreTokens()) {
         empName = itr.nextToken();
@@ -149,25 +169,141 @@ public class GroupByOrderByMRRTest extends Configured implements Tool {
     }
   }
 
+  private static DAG createDAG(Configuration conf, Map<String, LocalResource> commonLocalResources,
+      Path stagingDir, String inputPath, String outputPath, boolean useMRSettings)
+      throws Exception {
+
+    Configuration mapStageConf = new JobConf(conf);
+    mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
+        MyMapper.class.getName());
+
+    MRHelpers.translateMRConfToTez(mapStageConf);
+
+    Configuration iReduceStageConf = new JobConf(conf);
+    // TODO replace with auto-reduce parallelism
+    iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2);
+    iReduceStageConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
+        MyGroupByReducer.class.getName());
+    iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
+    iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS,
+        IntWritable.class.getName());
+    iReduceStageConf.setBoolean("mapred.mapper.new-api", true);
+    MRHelpers.translateMRConfToTez(iReduceStageConf);
+
+    Configuration finalReduceConf = new JobConf(conf);
+    finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1);
+    finalReduceConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
+        MyOrderByNoOpReducer.class.getName());
+    finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
+    finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
+    MRHelpers.translateMRConfToTez(finalReduceConf);
+
+    MRHelpers.configureMRApiUsage(mapStageConf);
+    MRHelpers.configureMRApiUsage(iReduceStageConf);
+    MRHelpers.configureMRApiUsage(finalReduceConf);
+
+    List<Vertex> vertices = new ArrayList<Vertex>();
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096);
+    mapStageConf.writeXml(outputStream);
+    String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8");
+    mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
+        TextInputFormat.class.getName());
+    mapStageConf.set(FileInputFormat.INPUT_DIR, inputPath);
+    mapStageConf.setBoolean("mapred.mapper.new-api", true);
+    DataSourceDescriptor dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration(
+        mapStageConf, stagingDir, true);
+
+    Vertex mapVertex;
+    ProcessorDescriptor mapProcessorDescriptor =
+        ProcessorDescriptor.create(MapProcessor.class.getName())
+            .setUserPayload(
+                TezUtils.createUserPayloadFromConf(mapStageConf))
+            .setHistoryText(mapStageHistoryText);
+    if (!useMRSettings) {
+      mapVertex = Vertex.create("initialmap", mapProcessorDescriptor);
+    } else {
+      mapVertex = Vertex.create("initialmap", mapProcessorDescriptor, -1,
+          MRHelpers.getResourceForMRMapper(mapStageConf));
+      mapVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRMapper(mapStageConf));
+    }
+    mapVertex.addTaskLocalFiles(commonLocalResources)
+        .addDataSource("MRInput", dsd);
+    vertices.add(mapVertex);
+
+    ByteArrayOutputStream iROutputStream = new ByteArrayOutputStream(4096);
+    iReduceStageConf.writeXml(iROutputStream);
+    String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8");
+
+    ProcessorDescriptor iReduceProcessorDescriptor = ProcessorDescriptor.create(
+        ReduceProcessor.class.getName())
+        .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf))
+        .setHistoryText(iReduceStageHistoryText);
+
+    Vertex intermediateVertex;
+    if (!useMRSettings) {
+      intermediateVertex = Vertex.create("ireduce1", iReduceProcessorDescriptor, 1);
+    } else {
+      intermediateVertex = Vertex.create("ireduce1", iReduceProcessorDescriptor,
+          1, MRHelpers.getResourceForMRReducer(iReduceStageConf));
+      intermediateVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(iReduceStageConf));
+    }
+    intermediateVertex.addTaskLocalFiles(commonLocalResources);
+    vertices.add(intermediateVertex);
+
+    ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
+    finalReduceConf.writeXml(finalReduceOutputStream);
+    String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
+    UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
+    Vertex finalReduceVertex;
+
+    ProcessorDescriptor finalReduceProcessorDescriptor =
+        ProcessorDescriptor.create(
+            ReduceProcessor.class.getName())
+            .setUserPayload(finalReducePayload)
+            .setHistoryText(finalReduceStageHistoryText);
+    if (!useMRSettings) {
+      finalReduceVertex = Vertex.create("finalreduce", finalReduceProcessorDescriptor, 1);
+    } else {
+      finalReduceVertex = Vertex.create("finalreduce", finalReduceProcessorDescriptor, 1,
+          MRHelpers.getResourceForMRReducer(finalReduceConf));
+      finalReduceVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer(finalReduceConf));
+    }
+    finalReduceVertex.addTaskLocalFiles(commonLocalResources);
+    finalReduceVertex.addDataSink("MROutput",
+        MROutputLegacy.createConfigBuilder(finalReduceConf, TextOutputFormat.class, outputPath)
+            .build());
+    vertices.add(finalReduceVertex);
+
+    DAG dag = DAG.create("groupbyorderbymrrtest");
+    for (Vertex v : vertices) {
+      dag.addVertex(v);
+    }
+
+    OrderedPartitionedKVEdgeConfig edgeConf1 = OrderedPartitionedKVEdgeConfig
+        .newBuilder(Text.class.getName(), IntWritable.class.getName(),
+            HashPartitioner.class.getName()).setFromConfiguration(conf)
+        .configureInput().useLegacyInput().done().build();
+    dag.addEdge(
+        Edge.create(dag.getVertex("initialmap"), dag.getVertex("ireduce1"),
+            edgeConf1.createDefaultEdgeProperty()));
+
+    OrderedPartitionedKVEdgeConfig edgeConf2 = OrderedPartitionedKVEdgeConfig
+        .newBuilder(IntWritable.class.getName(), Text.class.getName(),
+            HashPartitioner.class.getName()).setFromConfiguration(conf)
+        .configureInput().useLegacyInput().done().build();
+    dag.addEdge(
+        Edge.create(dag.getVertex("ireduce1"), dag.getVertex("finalreduce"),
+            edgeConf2.createDefaultEdgeProperty()));
+
+    return dag;
+  }
+
+
   @Override
   public int run(String[] args) throws Exception {
     Configuration conf = getConf();
 
-    // Configure intermediate reduces
-    conf.setInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 1);
-
-    // Set reducer class for intermediate reduce
-    conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
-        "mapreduce.job.reduce.class"), MyGroupByReducer.class, Reducer.class);
-    // Set reducer output key class
-    conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
-        "mapreduce.map.output.key.class"), IntWritable.class, Object.class);
-    // Set reducer output value class
-    conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
-        "mapreduce.map.output.value.class"), Text.class, Object.class);
-    conf.setInt(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
-        "mapreduce.job.reduces"), 2);
-
     String[] otherArgs = new GenericOptionsParser(conf, args).
         getRemainingArgs();
     if (otherArgs.length != 2) {
@@ -176,66 +312,55 @@ public class GroupByOrderByMRRTest extends Configured implements Tool {
       return 2;
     }
 
-    @SuppressWarnings("deprecation")
-    Job job = new Job(conf, "groupbyorderbymrrtest");
-
-    job.setJarByClass(GroupByOrderByMRRTest.class);
-
-    // Configure map
-    job.setMapperClass(MyMapper.class);
-    job.setMapOutputKeyClass(Text.class);
-    job.setMapOutputValueClass(IntWritable.class);
-
-    // Configure reduce
-    job.setReducerClass(MyOrderByNoOpReducer.class);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(IntWritable.class);
-    job.setNumReduceTasks(1);
-
-    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
-    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
-
-    job.submit();
-    JobID jobId = job.getJobID();
-    ApplicationId appId = TypeConverter.toYarn(jobId).getAppId();
-
-    DAGClient dagClient = MRTezClient.getDAGClient(appId, new TezConfiguration(conf), null);
-    DAGStatus dagStatus;
-    String[] vNames = { "initialmap" , "ireduce1" , "finalreduce" };
-    while (true) {
-      dagStatus = dagClient.getDAGStatus(null);
-      if(dagStatus.getState() == DAGStatus.State.RUNNING ||
-         dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
-         dagStatus.getState() == DAGStatus.State.FAILED ||
-         dagStatus.getState() == DAGStatus.State.KILLED ||
-         dagStatus.getState() == DAGStatus.State.ERROR) {
-        break;
-      }
-      try {
-        Thread.sleep(500);
-      } catch (InterruptedException e) {
-        // continue;
-      }
+    String inputPath = otherArgs[0];
+    String outputPath = otherArgs[1];
+
+    UserGroupInformation.setConfiguration(conf);
+
+    TezConfiguration tezConf = new TezConfiguration(conf);
+    FileSystem fs = FileSystem.get(conf);
+
+    if (fs.exists(new Path(outputPath))) {
+      throw new FileAlreadyExistsException("Output directory "
+          + outputPath + " already exists");
     }
 
-    while (dagStatus.getState() == DAGStatus.State.RUNNING) {
-      try {
-        ExampleDriver.printDAGStatus(dagClient, vNames);
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          // continue;
-        }
-        dagStatus = dagClient.getDAGStatus(null);
-      } catch (TezException e) {
-        LOG.fatal("Failed to get application progress. Exiting");
+    Map<String, LocalResource> localResources =
+        new TreeMap<String, LocalResource>();
+
+    String stagingDirStr =  conf.get(TezConfiguration.TEZ_AM_STAGING_DIR,
+        TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT) + Path.SEPARATOR +
+        Long.toString(System.currentTimeMillis());
+    Path stagingDir = new Path(stagingDirStr);
+    FileSystem pathFs = stagingDir.getFileSystem(tezConf);
+    pathFs.mkdirs(new Path(stagingDirStr));
+
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirStr);
+    stagingDir = pathFs.makeQualified(new Path(stagingDirStr));
+
+    TezClient tezClient = TezClient.create("groupbyorderbymrrtest", tezConf);
+    tezClient.start();
+
+    LOG.info("Submitting groupbyorderbymrrtest DAG as a new Tez Application");
+
+    try {
+      DAG dag = createDAG(conf, localResources, stagingDir, inputPath, outputPath, true);
+
+      tezClient.waitTillReady();
+
+      DAGClient dagClient = tezClient.submitDAG(dag);
+
+      DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
+      if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+        LOG.error("groupbyorderbymrrtest failed, state=" + dagStatus.getState()
+            + ", diagnostics=" + dagStatus.getDiagnostics());
         return -1;
       }
+      LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
+      return 0;
+    } finally {
+      tezClient.stop();
     }
-
-    ExampleDriver.printDAGStatus(dagClient, vNames);
-    LOG.info("Application completed. " + "FinalState=" + dagStatus.getState());
-    return dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1;
   }
 
   public static void main(String[] args) throws Exception {