You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/04/25 01:34:26 UTC

svn commit: r1471779 - in /incubator/tez/branches/TEZ-1: tez-dag-api/src/main/java/org/apache/tez/dag/api/ tez-dag/src/main/java/org/apache/hadoop/mapred/ tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ tez-mapreduce/src/main/ja...

Author: sseth
Date: Wed Apr 24 23:34:26 2013
New Revision: 1471779

URL: http://svn.apache.org/r1471779
Log:
TEZ-80. Better handling of multi-stage config keys. Convert MRR config to Tez stage specific config in YARNRunner. (sseth)

Added:
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
Modified:
    incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
    incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCountMRRTest.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
    incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java

Modified: incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java?rev=1471779&r1=1471778&r2=1471779&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java Wed Apr 24 23:34:26 2013
@@ -92,5 +92,4 @@ public class TezConfiguration extends Co
   };
 
   public static final String DAG_AM_PLAN_CONFIG_XML = "tez-dag.xml";
-
 }

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java?rev=1471779&r1=1471778&r2=1471779&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java Wed Apr 24 23:34:26 2013
@@ -409,12 +409,12 @@ public class YarnTezDagChild {
       Credentials credentials, Token<JobTokenIdentifier> jt,
       int appAttemptId)
       throws IOException, InterruptedException {
+
     Configuration jConf = new JobConf(MRJobConfig.JOB_CONF_FILE);
-    Configuration conf;
+    Configuration conf = MultiStageMRConfigUtil.getConfForVertex(jConf,
+        taskContext.getVertexName());
     
-    // TODO Post MRR. This structure will not allow randomly named vertices.
-    // Have the MRR client convert intermediate stage configuration to be based
-    // on vertex name.
+    // TOOD Post MRR
     // A single file per vertex will likely be a better solution. Does not
     // require translation - client can take care of this. Will work independent
     // of whether the configuration is for intermediate tasks or not. Has the
@@ -422,19 +422,6 @@ public class YarnTezDagChild {
     // need to write these files to hdfs, add them as local resources per
     // vertex. A solution like this may be more practical once it's possible to
     // submit configuration parameters to the AM and effectively tasks via RPC.
-    LOG.info("DEBUG: VertexName: " + taskContext.getVertexName());
-    if (MultiStageMRConfigUtil.isIntermediateReduceStage(taskContext
-        .getVertexName())) {
-      LOG.info("DEBUG: is intermediate stage");
-      int intermediateStageNum = MultiStageMRConfigUtil
-          .getIntermediateReduceStageNum(taskContext.getVertexName());
-      LOG.info("DEBUG: intermediateStageNum: " + intermediateStageNum);
-      conf = MultiStageMRConfigUtil.getIntermediateStageConf(jConf,
-          intermediateStageNum);
-      MultiStageMRConfigUtil.printConf(conf);
-    } else {
-      conf = jConf;
-    }
 
     // TODO Avoid all this extra config manipulation.
     final JobConf job = new JobConf(conf);

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCountMRRTest.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCountMRRTest.java?rev=1471779&r1=1471778&r2=1471779&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCountMRRTest.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCountMRRTest.java Wed Apr 24 23:34:26 2013
@@ -97,11 +97,16 @@ public class WordCountMRRTest {
     // Has to be set before initialzing job, since it creates a copy.
     // Alternately use JonConf. 
     conf.setInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 1);
-    conf.setClass(MultiStageMRConfigUtil.getPropertyNameForStage(1,
+    conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
         "mapreduce.job.combine.class"), IntermediateReducer.class,
         Reducer.class);
-    conf.setClass(MultiStageMRConfigUtil.getPropertyNameForStage(1,
+    conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
         "mapreduce.job.reduce.class"), IntermediateReducer.class, Reducer.class);
+    conf.set(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
+        "mapreduce.map.output.value.class"), IntWritable.class.getName());
+    
+    
+    
     System.err.println("Running wordcountMrrTest");
     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
     if (otherArgs.length != 2) {

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java?rev=1471779&r1=1471778&r2=1471779&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java Wed Apr 24 23:34:26 2013
@@ -22,24 +22,87 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.api.DAGConfiguration;
 import org.apache.tez.dag.api.TezConfiguration;
-
+import com.google.common.collect.Maps;
 
 public class DeprecatedKeys {
 
+  
+  
   // This could be done via deprecation.
+  /**
+   * Keys used by the DAG - mainly the AM. 
+   */
   private static Map<String, String> mrParamToDAGParamMap = new HashMap<String, String>();
 
-  public static Map<String, String> getMRToDAGParamMap() {
-    return Collections.unmodifiableMap(mrParamToDAGParamMap);
+  
+  public static enum MultiStageKeys {
+    INPUT, OUTPUT
   }
+  /**
+   * Keys which are used across an edge. i.e. by an Output-Input pair.
+   */
+  private static Map<String, Map<MultiStageKeys, String>> multiStageParamMap =
+      new HashMap<String, Map<MultiStageKeys, String>>();
+  
+  
+  /**
+   * Keys used by the engine.
+   */
+  private static Map<String, String> mrParamToEngineParamMap =
+      new HashMap<String, String>();
+
+  
  
   static {
-    addDeprecatedKeys();
+    populateMRToEngineParamMap();
+    populateMRToDagParamMap();
+    populateMultiStageParamMap();
+  }
+  
+  
+  private static void populateMultiStageParamMap() {
     
+    multiStageParamMap.put(
+        MRJobConfig.KEY_COMPARATOR,
+        getDeprecationMap(
+            TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
+            TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS));
+    
+    multiStageParamMap.put(
+        MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        getDeprecationMap(
+            TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS,
+            TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS));
+    
+    multiStageParamMap.put(
+        MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        getDeprecationMap(
+            TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS,
+            TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS));
+    
+    multiStageParamMap.put(
+        MRJobConfig.MAP_OUTPUT_COMPRESS,
+        getDeprecationMap(
+            TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_IS_COMPRESSED,
+            TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS));
+    
+    multiStageParamMap.put(
+        MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
+        getDeprecationMap(
+            TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC,
+            TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_COMPRESS_CODEC));
+  }
+  
+  private static Map<MultiStageKeys, String> getDeprecationMap(String inputKey, String outputKey) {
+    Map<MultiStageKeys, String>  m = Maps.newEnumMap(MultiStageKeys.class);
+    m.put(MultiStageKeys.INPUT, inputKey);
+    m.put(MultiStageKeys.OUTPUT, outputKey);
+    return m;
+  }
+  
+  private static void populateMRToDagParamMap() {
     mrParamToDAGParamMap.put(MRJobConfig.JOB_SUBMIT_DIR,
         TezConfiguration.JOB_SUBMIT_DIR);
     mrParamToDAGParamMap.put(MRJobConfig.APPLICATION_TOKENS_FILE,
@@ -71,78 +134,85 @@ public class DeprecatedKeys {
   public static void init() {
   }
   
-  private static void addDeprecatedKeys() {
+  private static void populateMRToEngineParamMap() {
     
-    _(MRConfig.MAPRED_IFILE_READAHEAD, TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD);
+    registerMRToEngineKeyTranslation(MRConfig.MAPRED_IFILE_READAHEAD, TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD);
 
-    _(MRConfig.MAPRED_IFILE_READAHEAD_BYTES, TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD_BYTES);
+    registerMRToEngineKeyTranslation(MRConfig.MAPRED_IFILE_READAHEAD_BYTES, TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD_BYTES);
     
-    _(MRJobConfig.RECORDS_BEFORE_PROGRESS, TezJobConfig.RECORDS_BEFORE_PROGRESS);
+    registerMRToEngineKeyTranslation(MRJobConfig.RECORDS_BEFORE_PROGRESS, TezJobConfig.RECORDS_BEFORE_PROGRESS);
     
-    _(MRJobConfig.JOB_LOCAL_DIR, MRConfig.LOCAL_DIR);
+    registerMRToEngineKeyTranslation(MRJobConfig.JOB_LOCAL_DIR, MRConfig.LOCAL_DIR);
         
-    _(MRJobConfig.NUM_REDUCES, TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE);
+    registerMRToEngineKeyTranslation(MRJobConfig.NUM_REDUCES, TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE);
 
-    _(MRJobConfig.NUM_MAPS, TezJobConfig.TEZ_ENGINE_TASK_INDEGREE);
-    
-    _(MRJobConfig.IO_SORT_FACTOR, TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR);
+    registerMRToEngineKeyTranslation(MRJobConfig.NUM_MAPS, TezJobConfig.TEZ_ENGINE_TASK_INDEGREE);
     
-    _(MRJobConfig.MAP_SORT_SPILL_PERCENT, TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT);
+    registerMRToEngineKeyTranslation(MRJobConfig.IO_SORT_FACTOR, TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR);
     
-    _(MRJobConfig.IO_SORT_MB, TezJobConfig.TEZ_ENGINE_IO_SORT_MB);
+    registerMRToEngineKeyTranslation(MRJobConfig.MAP_SORT_SPILL_PERCENT, TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT);
     
-    _(MRJobConfig.INDEX_CACHE_MEMORY_LIMIT, TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+    registerMRToEngineKeyTranslation(MRJobConfig.IO_SORT_MB, TezJobConfig.TEZ_ENGINE_IO_SORT_MB);
     
-    _(MRJobConfig.MAP_COMBINE_MIN_SPILLS, TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS);
+    registerMRToEngineKeyTranslation(MRJobConfig.INDEX_CACHE_MEMORY_LIMIT, TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
     
-    _(MRJobConfig.COUNTERS_MAX_KEY, TezJobConfig.COUNTERS_MAX_KEY);
+    registerMRToEngineKeyTranslation(MRJobConfig.MAP_COMBINE_MIN_SPILLS, TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS);
     
-    _(MRJobConfig.COUNTER_GROUP_NAME_MAX_KEY, TezJobConfig.COUNTER_GROUP_NAME_MAX_KEY);
+    registerMRToEngineKeyTranslation(MRJobConfig.COUNTERS_MAX_KEY, TezJobConfig.COUNTERS_MAX_KEY);
     
-    _(MRJobConfig.COUNTER_NAME_MAX_KEY, TezJobConfig.COUNTER_NAME_MAX_KEY);
+    registerMRToEngineKeyTranslation(MRJobConfig.COUNTER_GROUP_NAME_MAX_KEY, TezJobConfig.COUNTER_GROUP_NAME_MAX_KEY);
     
-    _(MRJobConfig.COUNTER_GROUPS_MAX_KEY, TezJobConfig.COUNTER_GROUPS_MAX_KEY);
+    registerMRToEngineKeyTranslation(MRJobConfig.COUNTER_NAME_MAX_KEY, TezJobConfig.COUNTER_NAME_MAX_KEY);
     
-    _(MRJobConfig.SHUFFLE_PARALLEL_COPIES, TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
+    registerMRToEngineKeyTranslation(MRJobConfig.COUNTER_GROUPS_MAX_KEY, TezJobConfig.COUNTER_GROUPS_MAX_KEY);
     
-    _(MRJobConfig.SHUFFLE_FETCH_FAILURES, TezJobConfig.TEZ_ENGINE_SHUFFLE_FETCH_FAILURES);
+    registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_PARALLEL_COPIES, TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
     
-    _(MRJobConfig.SHUFFLE_NOTIFY_READERROR, TezJobConfig.TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR);
+    registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_FETCH_FAILURES, TezJobConfig.TEZ_ENGINE_SHUFFLE_FETCH_FAILURES);
     
-    _(MRJobConfig.SHUFFLE_CONNECT_TIMEOUT, TezJobConfig.TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT);
+    registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_NOTIFY_READERROR, TezJobConfig.TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR);
     
-    _(MRJobConfig.SHUFFLE_READ_TIMEOUT, TezJobConfig.TEZ_ENGINE_SHUFFLE_READ_TIMEOUT);
+    registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_CONNECT_TIMEOUT, TezJobConfig.TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT);
     
-    _(MRConfig.SHUFFLE_SSL_ENABLED_KEY, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_SSL);
+    registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_READ_TIMEOUT, TezJobConfig.TEZ_ENGINE_SHUFFLE_READ_TIMEOUT);
     
-    _(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT);
+    registerMRToEngineKeyTranslation(MRConfig.SHUFFLE_SSL_ENABLED_KEY, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_SSL);
     
-    _(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT);
+    registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT);
     
-    _(MRJobConfig.SHUFFLE_MERGE_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_MERGE_PERCENT);
+    registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT);
     
-    _(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMTOMEM_SEGMENTS);
+    registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_MERGE_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_MERGE_PERCENT);
     
-    _(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM);
+    registerMRToEngineKeyTranslation(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMTOMEM_SEGMENTS);
     
-    _(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT);
+    registerMRToEngineKeyTranslation(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM);
     
-    _(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, TezJobConfig.DAG_CREDENTIALS_BINARY);
+    registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT);
     
-    _("map.sort.class", TezJobConfig.TEZ_ENGINE_INTERNAL_SORTER_CLASS);
+    registerMRToEngineKeyTranslation(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, TezJobConfig.DAG_CREDENTIALS_BINARY);
     
-    _(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_ENGINE_GROUP_COMPARATOR_CLASS);
-    
-    // TODO Parameters which cannot be handled via deprecation. Have to be habdled via another translation layer.
-    //_(MRJobConfig.KEY_COMPARATOR, TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS)
-    //_(MRJobConfig.MAP_OUTPUT_KEY_CLASS, TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS, TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS)
-    //_(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS, TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS)
-    //_(MRJobConfig.MAP_OUTPUT_COMPRESS, TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, TEZ_ENGINE_INTERMEDIATE_INPUT_IS_COMPRESSED
-    //_(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, TEZ_ENGINE_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC
+    registerMRToEngineKeyTranslation("map.sort.class", TezJobConfig.TEZ_ENGINE_INTERNAL_SORTER_CLASS);
     
+    registerMRToEngineKeyTranslation(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_ENGINE_GROUP_COMPARATOR_CLASS);
+  }
+
+  private static void registerMRToEngineKeyTranslation(String mrKey,
+      String tezKey) {
+    mrParamToEngineParamMap.put(mrKey, tezKey);
   }
 
-  private static void _(String oldKey, String newKey) {
-    Configuration.addDeprecation(oldKey, newKey);
+  public static Map<String, String> getMRToDAGParamMap() {
+    return Collections.unmodifiableMap(mrParamToDAGParamMap);
+  }
+
+  public static Map<String, String> getMRToEngineParamMap() {
+    return Collections.unmodifiableMap(mrParamToEngineParamMap);
   }
+
+  // TODO Ideally, multi-stage should not be exposed.
+  public static Map<String, Map<MultiStageKeys, String>> getMultiStageParamMap() {
+    return Collections.unmodifiableMap(multiStageParamMap);
+  }
+
 }

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java?rev=1471779&r1=1471778&r2=1471779&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java Wed Apr 24 23:34:26 2013
@@ -644,6 +644,10 @@ public interface MRJobConfig {
 
   public static final String MRR_INTERMEDIATE_STAGE_PREFIX =
       "mrr.intermediate.stage.";
+  
+  // TODO Move this over to Tez
+  public static final String MRR_VERTEX_PREFIX = 
+      "mrr.vertex.";
 
   // Stage specific properties
   // Format of each property is mapred.ireducer.stage.<stage-num>.<suffix>

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java?rev=1471779&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java Wed Apr 24 23:34:26 2013
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys.MultiStageKeys;
+
+public class MultiStageMRConfToTezTranslator {
+
+  private enum DeprecationReason {
+    DEPRECATED_DIRECT_TRANSLATION, DEPRECATED_MULTI_STAGE
+  }
+
+  // FIXME Add unit tests.
+  // This will convert configs to tez.<vertexName>.<OriginalProperty> for
+  // properties which it understands. Doing this for the initial and final task
+  // as well to verify functionality.
+  //
+
+  // TODO Set the cause properly.
+  public static Configuration convertMRToLinearTez(Configuration srcConf) {
+    Configuration newConf = new Configuration(srcConf);
+
+    int numIntermediateStages = MultiStageMRConfigUtil
+        .getNumIntermediateStages(srcConf);
+    boolean hasFinalReduceStage = (srcConf.getInt(MRJobConfig.NUM_REDUCES, 0) > 0);
+
+    // Assuming no 0 map jobs, and the first stage is always a map.
+    int totalStages = numIntermediateStages + (hasFinalReduceStage ? 2 : 1);
+    int numEdges = totalStages - 1;
+
+    Configuration[] allConfs = extractStageConfs(newConf, numEdges);
+
+    for (int i = 0; i < allConfs.length; i++) {
+      processDirectConversion(allConfs[i]);
+    }
+    for (int i = 0; i < allConfs.length - 1; i++) {
+      processMultiStageDepreaction(allConfs[i], allConfs[i + 1]);
+    }
+    // Unset unnecessary keys in the last stage. Will end up being called for
+    // single stage as well which should be harmless.
+    processMultiStageDepreaction(allConfs[allConfs.length - 1], null);
+
+    for (int i = 0; i < allConfs.length; i++) {
+      String vertexName;
+      if (i == 0) {
+        vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+      } else if (i == allConfs.length - 1) {
+        vertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
+      } else {
+        // Intermediate vertices start at 1
+        vertexName = MultiStageMRConfigUtil.getIntermediateStageVertexName(i);
+      }
+      MultiStageMRConfigUtil.addConfigurationForVertex(newConf, vertexName,
+          allConfs[i]);
+    }
+
+    return newConf;
+  }
+
+  /**
+   * Constructs a list containing individual configuration for each stage of the
+   * linear MR job, including the first map and last reduce if applicable.
+   */
+  private static Configuration[] extractStageConfs(Configuration conf,
+      int totalEdges) {
+    int numStages = totalEdges + 1;
+    Configuration confs[] = new Configuration[numStages];
+    // TODO Make moer efficient instead of multiple scans.
+    Configuration nonIntermediateConf = MultiStageMRConfigUtil
+        .getAndRemoveBasicNonIntermediateStageConf(conf);
+    if (numStages == 1) {
+      confs[0] = nonIntermediateConf;
+    } else {
+      confs[0] = nonIntermediateConf;
+      confs[numStages - 1] = new Configuration(nonIntermediateConf);
+    }
+    if (numStages > 2) {
+      for (int i = 1; i < numStages - 1; i++) {
+        confs[i] = MultiStageMRConfigUtil
+            .getAndRemoveBasicIntermediateStageConf(conf, i);
+      }
+    } else {
+
+    }
+
+    return confs;
+  }
+
+  private static void processDirectConversion(Configuration conf) {
+    for (Entry<String, String> dep : DeprecatedKeys.getMRToEngineParamMap()
+        .entrySet()) {
+      if (conf.get(dep.getKey()) != null) {
+        // TODO Deprecation reason does not seem to reflect in the config ?
+        conf.set(dep.getValue(), conf.get(dep.getKey()),
+            DeprecationReason.DEPRECATED_DIRECT_TRANSLATION.name());
+        conf.unset(dep.getKey());
+      }
+    }
+  }
+
+  private static void processMultiStageDepreaction(Configuration srcConf,
+      Configuration destConf) {
+
+    // All MR keys which need such translation are specified at src - hence,
+    // this is ok.
+    // No key exists in which the map is inferring something based on the reduce
+    // value.
+    for (Entry<String, Map<MultiStageKeys, String>> dep : DeprecatedKeys
+        .getMultiStageParamMap().entrySet()) {
+      if (srcConf.get(dep.getKey()) != null) {
+        if (destConf != null) {
+          srcConf.set(dep.getValue().get(MultiStageKeys.OUTPUT),
+              srcConf.get(dep.getKey()));
+          destConf.set(dep.getValue().get(MultiStageKeys.INPUT),
+              srcConf.get(dep.getKey()));
+          srcConf.unset(dep.getKey());
+        } else { // Last stage. Just remove the key reference.
+          srcConf.unset(dep.getKey());
+        }
+      }
+    }
+  }
+}

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java?rev=1471779&r1=1471778&r2=1471779&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java Wed Apr 24 23:34:26 2013
@@ -22,47 +22,54 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
-
-import com.google.common.base.Preconditions;
 
 public class MultiStageMRConfigUtil {
 
-  // TODO MRR FIXME based on conf format.
-  // Returns a complete conf object including non-intermediate stage conf.
+  //////////////////////////////////////////////////////////////////////////////
+  //                    Methods based on Stage Num                            //
+  //////////////////////////////////////////////////////////////////////////////
+
+  // Returns config settings specific to stage
+  public static Configuration getBasicIntermediateStageConf(
+      Configuration baseConf, int i) {
+    return getBasicIntermediateStageConfInternal(baseConf,
+        getPropertyNameForIntermediateStage(i, ""), false, true);
+  }
+
+  // Returns and removes config settings specific to stage
+  public static Configuration getAndRemoveBasicIntermediateStageConf(
+      Configuration baseConf, int i) {
+    return getBasicIntermediateStageConfInternal(baseConf,
+        getPropertyNameForIntermediateStage(i, ""), true, true);
+  }
+
+  // TODO Get rid of this once YARNRunner starts using VertexNames.
   public static Configuration getIntermediateStageConf(Configuration baseConf,
       int i) {
-    String base = getPropertyNameForStage(i, "");
-    Configuration conf = new Configuration(false);
-    Iterator<Entry<String, String>> confEntries = baseConf.iterator();
-    while (confEntries.hasNext()) {
-      Entry<String, String> entry = confEntries.next();
-      String key = entry.getKey();
-      if (key.startsWith(base)) {
-        conf.set(key.replace(base, ""), entry.getValue());
-      } else {
-        conf.set(key, entry.getValue());
+    return getBasicIntermediateStageConfInternal(baseConf,
+        getPropertyNameForIntermediateStage(i, ""), false, false);
+  }
+
+  // FIXME small perf hit. Change this to parse through all keys once and
+  // generate objects per
+  // stage instead of scanning through conf multiple times.
+  public static Configuration getAndRemoveBasicNonIntermediateStageConf(
+      Configuration baseConf) {
+    Configuration newConf = new Configuration(false);
+    for (String key : DeprecatedKeys.getMRToEngineParamMap().keySet()) {
+      if (baseConf.get(key) != null) {
+        newConf.set(key, baseConf.get(key));
+        baseConf.unset(key);
       }
     }
-    return conf;
-  }
 
-  // TODO MRR FIXME based on conf format.
-  // Returns config settings specific to stage-i only.
-  public static Configuration getBasicIntermediateStageConf(
-      Configuration baseConf, int i) {
-    String base = getPropertyNameForStage(i, "");
-    Configuration conf = new Configuration(false);
-    Iterator<Entry<String, String>> confEntries = baseConf.iterator();
-    while (confEntries.hasNext()) {
-      Entry<String, String> entry = confEntries.next();
-      String key = entry.getKey();
-      if (key.startsWith(base)) {
-        conf.set(key.replace(base, ""), entry.getValue());
+    for (String key : DeprecatedKeys.getMultiStageParamMap().keySet()) {
+      if (baseConf.get(key) != null) {
+        newConf.set(key, baseConf.get(key));
+        baseConf.unset(key);
       }
     }
-    return conf;
+    return newConf;
   }
 
   // TODO MRR FIXME based on conf format.
@@ -71,40 +78,70 @@ public class MultiStageMRConfigUtil {
   }
 
   // TODO MRR FIXME based on conf format.
-  public static String getPropertyNameForStage(int intermediateStage,
-      String originalPropertyName) {
+  // Intermediate stage numbers should start from 1.
+  public static String getPropertyNameForIntermediateStage(
+      int intermediateStage, String originalPropertyName) {
     return MRJobConfig.MRR_INTERMEDIATE_STAGE_PREFIX + intermediateStage + "."
         + originalPropertyName;
   }
+ 
+ //////////////////////////////////////////////////////////////////////////////
+ //                  Methods based on Vertex Name                            //
+ //////////////////////////////////////////////////////////////////////////////
+  
+  private static final String INITIAL_MAP_VERTEX_NAME = "initialmap";
+  private static final String FINAL_REDUCE_VERTEX_NAME = "finalreduce";
+  private static final String INTERMEDIATE_TASK_VERTEX_NAME_PREFIX = "ivertex";
 
-  public static void main(String[] args) {
-    Configuration baseConf = new Configuration();
-    baseConf.setInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 1);
-    baseConf.setClass(MultiStageMRConfigUtil.getPropertyNameForStage(1,
-        "mapreduce.job.combine.class"), IntSumReducer.class, Reducer.class);
-    baseConf.setClass(MultiStageMRConfigUtil.getPropertyNameForStage(1,
-        "mapreduce.job.reduce.class"), IntSumReducer.class, Reducer.class);
+  public static String getInitialMapVertexName() {
+    return INITIAL_MAP_VERTEX_NAME;
+  }
 
-    Configuration conf = getBasicIntermediateStageConf(baseConf, 1);
-    printConf(conf);
+  public static String getFinalReduceVertexName() {
+    return FINAL_REDUCE_VERTEX_NAME;
   }
-  
-  private static String IREDUCE_PREFIX = "ireduce";
-  
-  public static String getIntermediateReduceVertexName(int i) {
-    return "ireduce" + i;
+
+  public static String getIntermediateStageVertexName(int stageNum) {
+    return INTERMEDIATE_TASK_VERTEX_NAME_PREFIX + stageNum;
   }
 
-  public static boolean isIntermediateReduceStage(String vertexName) {
-    return vertexName.startsWith(IREDUCE_PREFIX);
+  // Returns config settings specific to named vertex
+  public static Configuration getBasicConfForVertex(Configuration baseConf,
+      String vertexName) {
+    return getBasicIntermediateStageConfInternal(baseConf,
+        getPropertyNameForVertex(vertexName, ""), false, true);
   }
-  
-  public static int getIntermediateReduceStageNum(String vertexName) {
-    Preconditions.checkArgument(vertexName.startsWith(IREDUCE_PREFIX),
-        "IntermediateReduce vertex name must start with prefix: "
-            + IREDUCE_PREFIX);
-    String stageNumString = vertexName.substring(IREDUCE_PREFIX.length());
-    return Integer.valueOf(stageNumString);
+
+  // Returns and removes config settings specific to named vertex
+  public static Configuration getAndRemoveBasicConfForVertex(
+      Configuration baseConf, String vertexName) {
+    return getBasicIntermediateStageConfInternal(baseConf,
+        getPropertyNameForVertex(vertexName, ""), true, true);
+  }
+
+  // Returns a config with all parameters, and vertex specific params moved to
+  // the top level.
+  public static Configuration getConfForVertex(Configuration baseConf,
+      String vertexName) {
+    return getBasicIntermediateStageConfInternal(baseConf,
+        getPropertyNameForVertex(vertexName, ""), false, false);
+  }
+
+  public static void addConfigurationForVertex(Configuration baseConf,
+      String vertexName, Configuration vertexConf) {
+    Iterator<Entry<String, String>> confEntries = vertexConf.iterator();
+    while (confEntries.hasNext()) {
+      Entry<String, String> entry = confEntries.next();
+      baseConf.set(getPropertyNameForVertex(vertexName, entry.getKey()),
+          entry.getValue());
+    }
+  }
+
+  // TODO This is TezEngineLand
+  public static String getPropertyNameForVertex(String vertexName,
+      String originalPropertyName) {
+    return MRJobConfig.MRR_VERTEX_PREFIX + vertexName + "."
+        + originalPropertyName;
   }
 
   // TODO Get rid of this. Temporary for testing.
@@ -117,4 +154,27 @@ public class MultiStageMRConfigUtil {
       System.err.println("Key: " + key + ", Value: " + value);
     }
   }
+  
+  // TODO MRR FIXME based on conf format.
+  private static Configuration getBasicIntermediateStageConfInternal(
+      Configuration baseConf, String prefix, boolean remove, boolean stageOnly) {
+    Configuration conf = new Configuration(false);
+    Iterator<Entry<String, String>> confEntries = baseConf.iterator();
+    while (confEntries.hasNext()) {
+      Entry<String, String> entry = confEntries.next();
+      String key = entry.getKey();
+      if (key.startsWith(prefix)) {
+        conf.set(key.replace(prefix, ""), entry.getValue());
+        if (remove) {
+          baseConf.unset(key);
+        }
+      } else if (!stageOnly) {
+        conf.set(key, entry.getValue());
+      }
+    }
+    return conf;
+  }
+
+
+  
 }

Modified: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java?rev=1471779&r1=1471778&r2=1471779&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java (original)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java Wed Apr 24 23:34:26 2013
@@ -62,6 +62,7 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
 import org.apache.hadoop.mapreduce.QueueAclsInfo;
 import org.apache.hadoop.mapreduce.QueueInfo;
@@ -510,8 +511,9 @@ public class YARNRunner implements Clien
     int stageNum = iReduceIndex + 1;
     Configuration conf = MultiStageMRConfigUtil.getIntermediateStageConf(jobConf, stageNum);
     int numTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0);
+    // Intermediate vertices start at 1.
     Vertex vertex = new Vertex(
-        MultiStageMRConfigUtil.getIntermediateReduceVertexName(stageNum),
+        MultiStageMRConfigUtil.getIntermediateStageVertexName(stageNum),
         "org.apache.tez.mapreduce.task.IntermediateTask", numTasks);
     
     Map<String, String> reduceEnv = new HashMap<String, String>();
@@ -575,7 +577,7 @@ public class YARNRunner implements Clien
     String mapProcessor = mapOnly ?
         "org.apache.tez.mapreduce.task.MapOnlyTask"
         : "org.apache.tez.mapreduce.task.InitialTask";
-    Vertex mapVertex = new Vertex("map", mapProcessor, numMaps);
+    Vertex mapVertex = new Vertex(MultiStageMRConfigUtil.getInitialMapVertexName(), mapProcessor, numMaps);
 
     // FIXME set up map environment
     Map<String, String> mapEnv = new HashMap<String, String>();
@@ -616,7 +618,7 @@ public class YARNRunner implements Clien
     if (numReduces > 0) {
       String reduceProcessor =
           "org.apache.tez.mapreduce.task.FinalTask";
-      Vertex reduceVertex = new Vertex("reduce", reduceProcessor, numReduces);
+      Vertex reduceVertex = new Vertex(MultiStageMRConfigUtil.getFinalReduceVertexName(), reduceProcessor, numReduces);
 
       // FIXME set up reduce environment
       Map<String, String> reduceEnv = new HashMap<String, String>();
@@ -825,6 +827,19 @@ public class YARNRunner implements Clien
     return appContext;
   }
 
+  private void writeTezConf(String jobSubmitDir, FileSystem fs,
+      Configuration tezConf) throws IOException {
+    Path dagConfFilePath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
+
+    FSDataOutputStream tezConfOut = FileSystem.create(fs, dagConfFilePath,
+        new FsPermission(DAG_FILE_PERMISSION));
+    try {
+      tezConf.writeXml(tezConfOut);
+    } finally {
+      tezConfOut.close();
+    }
+  }
+
   @Override
   public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
   throws IOException, InterruptedException {
@@ -840,14 +855,18 @@ public class YARNRunner implements Clien
 
     FileSystem fs = FileSystem.get(conf);
     JobConf jobConf = new JobConf(new TezConfiguration(conf));
+    Configuration tezJobConf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
+    
+    // This will replace job.xml in the staging dir.
+    writeTezConf(jobSubmitDir, fs, tezJobConf);    
 
     // FIXME set up job resources
     Map<String, LocalResource> jobLocalResources =
-        createJobLocalResources(jobConf, jobSubmitDir);
+        createJobLocalResources(tezJobConf, jobSubmitDir);
     DAG dag = createDAG(fs, jobId, jobConf, jobSubmitDir, ts,
         jobLocalResources);
     ApplicationSubmissionContext appContext =
-        createApplicationSubmissionContext(fs, dag, jobConf, jobSubmitDir, ts,
+        createApplicationSubmissionContext(fs, dag, tezJobConf, jobSubmitDir, ts,
             jobLocalResources);
 
     // Submit to ResourceManager