You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/04/25 01:34:26 UTC
svn commit: r1471779 - in /incubator/tez/branches/TEZ-1:
tez-dag-api/src/main/java/org/apache/tez/dag/api/
tez-dag/src/main/java/org/apache/hadoop/mapred/
tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/
tez-mapreduce/src/main/ja...
Author: sseth
Date: Wed Apr 24 23:34:26 2013
New Revision: 1471779
URL: http://svn.apache.org/r1471779
Log:
TEZ-80. Better handling of multi-stage config keys. Convert MRR config to Tez stage specific config in YARNRunner. (sseth)
Added:
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
Modified:
incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCountMRRTest.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
Modified: incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java?rev=1471779&r1=1471778&r2=1471779&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java Wed Apr 24 23:34:26 2013
@@ -92,5 +92,4 @@ public class TezConfiguration extends Co
};
public static final String DAG_AM_PLAN_CONFIG_XML = "tez-dag.xml";
-
}
Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java?rev=1471779&r1=1471778&r2=1471779&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java Wed Apr 24 23:34:26 2013
@@ -409,12 +409,12 @@ public class YarnTezDagChild {
Credentials credentials, Token<JobTokenIdentifier> jt,
int appAttemptId)
throws IOException, InterruptedException {
+
Configuration jConf = new JobConf(MRJobConfig.JOB_CONF_FILE);
- Configuration conf;
+ Configuration conf = MultiStageMRConfigUtil.getConfForVertex(jConf,
+ taskContext.getVertexName());
- // TODO Post MRR. This structure will not allow randomly named vertices.
- // Have the MRR client convert intermediate stage configuration to be based
- // on vertex name.
+ // TOOD Post MRR
// A single file per vertex will likely be a better solution. Does not
// require translation - client can take care of this. Will work independent
// of whether the configuration is for intermediate tasks or not. Has the
@@ -422,19 +422,6 @@ public class YarnTezDagChild {
// need to write these files to hdfs, add them as local resources per
// vertex. A solution like this may be more practical once it's possible to
// submit configuration parameters to the AM and effectively tasks via RPC.
- LOG.info("DEBUG: VertexName: " + taskContext.getVertexName());
- if (MultiStageMRConfigUtil.isIntermediateReduceStage(taskContext
- .getVertexName())) {
- LOG.info("DEBUG: is intermediate stage");
- int intermediateStageNum = MultiStageMRConfigUtil
- .getIntermediateReduceStageNum(taskContext.getVertexName());
- LOG.info("DEBUG: intermediateStageNum: " + intermediateStageNum);
- conf = MultiStageMRConfigUtil.getIntermediateStageConf(jConf,
- intermediateStageNum);
- MultiStageMRConfigUtil.printConf(conf);
- } else {
- conf = jConf;
- }
// TODO Avoid all this extra config manipulation.
final JobConf job = new JobConf(conf);
Modified: incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCountMRRTest.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCountMRRTest.java?rev=1471779&r1=1471778&r2=1471779&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCountMRRTest.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCountMRRTest.java Wed Apr 24 23:34:26 2013
@@ -97,11 +97,16 @@ public class WordCountMRRTest {
// Has to be set before initialzing job, since it creates a copy.
// Alternately use JonConf.
conf.setInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 1);
- conf.setClass(MultiStageMRConfigUtil.getPropertyNameForStage(1,
+ conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
"mapreduce.job.combine.class"), IntermediateReducer.class,
Reducer.class);
- conf.setClass(MultiStageMRConfigUtil.getPropertyNameForStage(1,
+ conf.setClass(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
"mapreduce.job.reduce.class"), IntermediateReducer.class, Reducer.class);
+ conf.set(MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(1,
+ "mapreduce.map.output.value.class"), IntWritable.class.getName());
+
+
+
System.err.println("Running wordcountMrrTest");
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java?rev=1471779&r1=1471778&r2=1471779&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java Wed Apr 24 23:34:26 2013
@@ -22,24 +22,87 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.api.DAGConfiguration;
import org.apache.tez.dag.api.TezConfiguration;
-
+import com.google.common.collect.Maps;
public class DeprecatedKeys {
+
+
// This could be done via deprecation.
+ /**
+ * Keys used by the DAG - mainly the AM.
+ */
private static Map<String, String> mrParamToDAGParamMap = new HashMap<String, String>();
- public static Map<String, String> getMRToDAGParamMap() {
- return Collections.unmodifiableMap(mrParamToDAGParamMap);
+
+ public static enum MultiStageKeys {
+ INPUT, OUTPUT
}
+ /**
+ * Keys which are used across an edge. i.e. by an Output-Input pair.
+ */
+ private static Map<String, Map<MultiStageKeys, String>> multiStageParamMap =
+ new HashMap<String, Map<MultiStageKeys, String>>();
+
+
+ /**
+ * Keys used by the engine.
+ */
+ private static Map<String, String> mrParamToEngineParamMap =
+ new HashMap<String, String>();
+
+
static {
- addDeprecatedKeys();
+ populateMRToEngineParamMap();
+ populateMRToDagParamMap();
+ populateMultiStageParamMap();
+ }
+
+
+ private static void populateMultiStageParamMap() {
+ multiStageParamMap.put(
+ MRJobConfig.KEY_COMPARATOR,
+ getDeprecationMap(
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS));
+
+ multiStageParamMap.put(
+ MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+ getDeprecationMap(
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS,
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS));
+
+ multiStageParamMap.put(
+ MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+ getDeprecationMap(
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS,
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS));
+
+ multiStageParamMap.put(
+ MRJobConfig.MAP_OUTPUT_COMPRESS,
+ getDeprecationMap(
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_IS_COMPRESSED,
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS));
+
+ multiStageParamMap.put(
+ MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
+ getDeprecationMap(
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC,
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_COMPRESS_CODEC));
+ }
+
+ private static Map<MultiStageKeys, String> getDeprecationMap(String inputKey, String outputKey) {
+ Map<MultiStageKeys, String> m = Maps.newEnumMap(MultiStageKeys.class);
+ m.put(MultiStageKeys.INPUT, inputKey);
+ m.put(MultiStageKeys.OUTPUT, outputKey);
+ return m;
+ }
+
+ private static void populateMRToDagParamMap() {
mrParamToDAGParamMap.put(MRJobConfig.JOB_SUBMIT_DIR,
TezConfiguration.JOB_SUBMIT_DIR);
mrParamToDAGParamMap.put(MRJobConfig.APPLICATION_TOKENS_FILE,
@@ -71,78 +134,85 @@ public class DeprecatedKeys {
public static void init() {
}
- private static void addDeprecatedKeys() {
+ private static void populateMRToEngineParamMap() {
- _(MRConfig.MAPRED_IFILE_READAHEAD, TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD);
+ registerMRToEngineKeyTranslation(MRConfig.MAPRED_IFILE_READAHEAD, TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD);
- _(MRConfig.MAPRED_IFILE_READAHEAD_BYTES, TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD_BYTES);
+ registerMRToEngineKeyTranslation(MRConfig.MAPRED_IFILE_READAHEAD_BYTES, TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD_BYTES);
- _(MRJobConfig.RECORDS_BEFORE_PROGRESS, TezJobConfig.RECORDS_BEFORE_PROGRESS);
+ registerMRToEngineKeyTranslation(MRJobConfig.RECORDS_BEFORE_PROGRESS, TezJobConfig.RECORDS_BEFORE_PROGRESS);
- _(MRJobConfig.JOB_LOCAL_DIR, MRConfig.LOCAL_DIR);
+ registerMRToEngineKeyTranslation(MRJobConfig.JOB_LOCAL_DIR, MRConfig.LOCAL_DIR);
- _(MRJobConfig.NUM_REDUCES, TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE);
+ registerMRToEngineKeyTranslation(MRJobConfig.NUM_REDUCES, TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE);
- _(MRJobConfig.NUM_MAPS, TezJobConfig.TEZ_ENGINE_TASK_INDEGREE);
-
- _(MRJobConfig.IO_SORT_FACTOR, TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR);
+ registerMRToEngineKeyTranslation(MRJobConfig.NUM_MAPS, TezJobConfig.TEZ_ENGINE_TASK_INDEGREE);
- _(MRJobConfig.MAP_SORT_SPILL_PERCENT, TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT);
+ registerMRToEngineKeyTranslation(MRJobConfig.IO_SORT_FACTOR, TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR);
- _(MRJobConfig.IO_SORT_MB, TezJobConfig.TEZ_ENGINE_IO_SORT_MB);
+ registerMRToEngineKeyTranslation(MRJobConfig.MAP_SORT_SPILL_PERCENT, TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT);
- _(MRJobConfig.INDEX_CACHE_MEMORY_LIMIT, TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+ registerMRToEngineKeyTranslation(MRJobConfig.IO_SORT_MB, TezJobConfig.TEZ_ENGINE_IO_SORT_MB);
- _(MRJobConfig.MAP_COMBINE_MIN_SPILLS, TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS);
+ registerMRToEngineKeyTranslation(MRJobConfig.INDEX_CACHE_MEMORY_LIMIT, TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
- _(MRJobConfig.COUNTERS_MAX_KEY, TezJobConfig.COUNTERS_MAX_KEY);
+ registerMRToEngineKeyTranslation(MRJobConfig.MAP_COMBINE_MIN_SPILLS, TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS);
- _(MRJobConfig.COUNTER_GROUP_NAME_MAX_KEY, TezJobConfig.COUNTER_GROUP_NAME_MAX_KEY);
+ registerMRToEngineKeyTranslation(MRJobConfig.COUNTERS_MAX_KEY, TezJobConfig.COUNTERS_MAX_KEY);
- _(MRJobConfig.COUNTER_NAME_MAX_KEY, TezJobConfig.COUNTER_NAME_MAX_KEY);
+ registerMRToEngineKeyTranslation(MRJobConfig.COUNTER_GROUP_NAME_MAX_KEY, TezJobConfig.COUNTER_GROUP_NAME_MAX_KEY);
- _(MRJobConfig.COUNTER_GROUPS_MAX_KEY, TezJobConfig.COUNTER_GROUPS_MAX_KEY);
+ registerMRToEngineKeyTranslation(MRJobConfig.COUNTER_NAME_MAX_KEY, TezJobConfig.COUNTER_NAME_MAX_KEY);
- _(MRJobConfig.SHUFFLE_PARALLEL_COPIES, TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
+ registerMRToEngineKeyTranslation(MRJobConfig.COUNTER_GROUPS_MAX_KEY, TezJobConfig.COUNTER_GROUPS_MAX_KEY);
- _(MRJobConfig.SHUFFLE_FETCH_FAILURES, TezJobConfig.TEZ_ENGINE_SHUFFLE_FETCH_FAILURES);
+ registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_PARALLEL_COPIES, TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
- _(MRJobConfig.SHUFFLE_NOTIFY_READERROR, TezJobConfig.TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR);
+ registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_FETCH_FAILURES, TezJobConfig.TEZ_ENGINE_SHUFFLE_FETCH_FAILURES);
- _(MRJobConfig.SHUFFLE_CONNECT_TIMEOUT, TezJobConfig.TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT);
+ registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_NOTIFY_READERROR, TezJobConfig.TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR);
- _(MRJobConfig.SHUFFLE_READ_TIMEOUT, TezJobConfig.TEZ_ENGINE_SHUFFLE_READ_TIMEOUT);
+ registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_CONNECT_TIMEOUT, TezJobConfig.TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT);
- _(MRConfig.SHUFFLE_SSL_ENABLED_KEY, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_SSL);
+ registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_READ_TIMEOUT, TezJobConfig.TEZ_ENGINE_SHUFFLE_READ_TIMEOUT);
- _(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT);
+ registerMRToEngineKeyTranslation(MRConfig.SHUFFLE_SSL_ENABLED_KEY, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_SSL);
- _(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT);
+ registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT);
- _(MRJobConfig.SHUFFLE_MERGE_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_MERGE_PERCENT);
+ registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT);
- _(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMTOMEM_SEGMENTS);
+ registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_MERGE_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_MERGE_PERCENT);
- _(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM);
+ registerMRToEngineKeyTranslation(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMTOMEM_SEGMENTS);
- _(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT);
+ registerMRToEngineKeyTranslation(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM);
- _(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, TezJobConfig.DAG_CREDENTIALS_BINARY);
+ registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT);
- _("map.sort.class", TezJobConfig.TEZ_ENGINE_INTERNAL_SORTER_CLASS);
+ registerMRToEngineKeyTranslation(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, TezJobConfig.DAG_CREDENTIALS_BINARY);
- _(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_ENGINE_GROUP_COMPARATOR_CLASS);
-
- // TODO Parameters which cannot be handled via deprecation. Have to be habdled via another translation layer.
- //_(MRJobConfig.KEY_COMPARATOR, TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS)
- //_(MRJobConfig.MAP_OUTPUT_KEY_CLASS, TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS, TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS)
- //_(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS, TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS)
- //_(MRJobConfig.MAP_OUTPUT_COMPRESS, TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, TEZ_ENGINE_INTERMEDIATE_INPUT_IS_COMPRESSED
- //_(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, TEZ_ENGINE_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC
+ registerMRToEngineKeyTranslation("map.sort.class", TezJobConfig.TEZ_ENGINE_INTERNAL_SORTER_CLASS);
+ registerMRToEngineKeyTranslation(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_ENGINE_GROUP_COMPARATOR_CLASS);
+ }
+
+ private static void registerMRToEngineKeyTranslation(String mrKey,
+ String tezKey) {
+ mrParamToEngineParamMap.put(mrKey, tezKey);
}
- private static void _(String oldKey, String newKey) {
- Configuration.addDeprecation(oldKey, newKey);
+ public static Map<String, String> getMRToDAGParamMap() {
+ return Collections.unmodifiableMap(mrParamToDAGParamMap);
+ }
+
+ public static Map<String, String> getMRToEngineParamMap() {
+ return Collections.unmodifiableMap(mrParamToEngineParamMap);
}
+
+ // TODO Ideally, multi-stage should not be exposed.
+ public static Map<String, Map<MultiStageKeys, String>> getMultiStageParamMap() {
+ return Collections.unmodifiableMap(multiStageParamMap);
+ }
+
}
Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java?rev=1471779&r1=1471778&r2=1471779&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java Wed Apr 24 23:34:26 2013
@@ -644,6 +644,10 @@ public interface MRJobConfig {
public static final String MRR_INTERMEDIATE_STAGE_PREFIX =
"mrr.intermediate.stage.";
+
+ // TODO Move this over to Tez
+ public static final String MRR_VERTEX_PREFIX =
+ "mrr.vertex.";
// Stage specific properties
// Format of each property is mapred.ireducer.stage.<stage-num>.<suffix>
Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java?rev=1471779&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java Wed Apr 24 23:34:26 2013
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys.MultiStageKeys;
+
+public class MultiStageMRConfToTezTranslator {
+
+ private enum DeprecationReason {
+ DEPRECATED_DIRECT_TRANSLATION, DEPRECATED_MULTI_STAGE
+ }
+
+ // FIXME Add unit tests.
+ // This will convert configs to tez.<vertexName>.<OriginalProperty> for
+ // properties which it understands. Doing this for the initial and final task
+ // as well to verify functionality.
+ //
+
+ // TODO Set the cause properly.
+ public static Configuration convertMRToLinearTez(Configuration srcConf) {
+ Configuration newConf = new Configuration(srcConf);
+
+ int numIntermediateStages = MultiStageMRConfigUtil
+ .getNumIntermediateStages(srcConf);
+ boolean hasFinalReduceStage = (srcConf.getInt(MRJobConfig.NUM_REDUCES, 0) > 0);
+
+ // Assuming no 0 map jobs, and the first stage is always a map.
+ int totalStages = numIntermediateStages + (hasFinalReduceStage ? 2 : 1);
+ int numEdges = totalStages - 1;
+
+ Configuration[] allConfs = extractStageConfs(newConf, numEdges);
+
+ for (int i = 0; i < allConfs.length; i++) {
+ processDirectConversion(allConfs[i]);
+ }
+ for (int i = 0; i < allConfs.length - 1; i++) {
+ processMultiStageDepreaction(allConfs[i], allConfs[i + 1]);
+ }
+ // Unset unnecessary keys in the last stage. Will end up being called for
+ // single stage as well which should be harmless.
+ processMultiStageDepreaction(allConfs[allConfs.length - 1], null);
+
+ for (int i = 0; i < allConfs.length; i++) {
+ String vertexName;
+ if (i == 0) {
+ vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+ } else if (i == allConfs.length - 1) {
+ vertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
+ } else {
+ // Intermediate vertices start at 1
+ vertexName = MultiStageMRConfigUtil.getIntermediateStageVertexName(i);
+ }
+ MultiStageMRConfigUtil.addConfigurationForVertex(newConf, vertexName,
+ allConfs[i]);
+ }
+
+ return newConf;
+ }
+
+ /**
+ * Constructs a list containing individual configuration for each stage of the
+ * linear MR job, including the first map and last reduce if applicable.
+ */
+ private static Configuration[] extractStageConfs(Configuration conf,
+ int totalEdges) {
+ int numStages = totalEdges + 1;
+ Configuration confs[] = new Configuration[numStages];
+ // TODO Make moer efficient instead of multiple scans.
+ Configuration nonIntermediateConf = MultiStageMRConfigUtil
+ .getAndRemoveBasicNonIntermediateStageConf(conf);
+ if (numStages == 1) {
+ confs[0] = nonIntermediateConf;
+ } else {
+ confs[0] = nonIntermediateConf;
+ confs[numStages - 1] = new Configuration(nonIntermediateConf);
+ }
+ if (numStages > 2) {
+ for (int i = 1; i < numStages - 1; i++) {
+ confs[i] = MultiStageMRConfigUtil
+ .getAndRemoveBasicIntermediateStageConf(conf, i);
+ }
+ } else {
+
+ }
+
+ return confs;
+ }
+
+ private static void processDirectConversion(Configuration conf) {
+ for (Entry<String, String> dep : DeprecatedKeys.getMRToEngineParamMap()
+ .entrySet()) {
+ if (conf.get(dep.getKey()) != null) {
+ // TODO Deprecation reason does not seem to reflect in the config ?
+ conf.set(dep.getValue(), conf.get(dep.getKey()),
+ DeprecationReason.DEPRECATED_DIRECT_TRANSLATION.name());
+ conf.unset(dep.getKey());
+ }
+ }
+ }
+
+ private static void processMultiStageDepreaction(Configuration srcConf,
+ Configuration destConf) {
+
+ // All MR keys which need such translation are specified at src - hence,
+ // this is ok.
+ // No key exists in which the map is inferring something based on the reduce
+ // value.
+ for (Entry<String, Map<MultiStageKeys, String>> dep : DeprecatedKeys
+ .getMultiStageParamMap().entrySet()) {
+ if (srcConf.get(dep.getKey()) != null) {
+ if (destConf != null) {
+ srcConf.set(dep.getValue().get(MultiStageKeys.OUTPUT),
+ srcConf.get(dep.getKey()));
+ destConf.set(dep.getValue().get(MultiStageKeys.INPUT),
+ srcConf.get(dep.getKey()));
+ srcConf.unset(dep.getKey());
+ } else { // Last stage. Just remove the key reference.
+ srcConf.unset(dep.getKey());
+ }
+ }
+ }
+ }
+}
Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java?rev=1471779&r1=1471778&r2=1471779&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java Wed Apr 24 23:34:26 2013
@@ -22,47 +22,54 @@ import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
-
-import com.google.common.base.Preconditions;
public class MultiStageMRConfigUtil {
- // TODO MRR FIXME based on conf format.
- // Returns a complete conf object including non-intermediate stage conf.
+ //////////////////////////////////////////////////////////////////////////////
+ // Methods based on Stage Num //
+ //////////////////////////////////////////////////////////////////////////////
+
+ // Returns config settings specific to stage
+ public static Configuration getBasicIntermediateStageConf(
+ Configuration baseConf, int i) {
+ return getBasicIntermediateStageConfInternal(baseConf,
+ getPropertyNameForIntermediateStage(i, ""), false, true);
+ }
+
+ // Returns and removes config settings specific to stage
+ public static Configuration getAndRemoveBasicIntermediateStageConf(
+ Configuration baseConf, int i) {
+ return getBasicIntermediateStageConfInternal(baseConf,
+ getPropertyNameForIntermediateStage(i, ""), true, true);
+ }
+
+ // TODO Get rid of this once YARNRunner starts using VertexNames.
public static Configuration getIntermediateStageConf(Configuration baseConf,
int i) {
- String base = getPropertyNameForStage(i, "");
- Configuration conf = new Configuration(false);
- Iterator<Entry<String, String>> confEntries = baseConf.iterator();
- while (confEntries.hasNext()) {
- Entry<String, String> entry = confEntries.next();
- String key = entry.getKey();
- if (key.startsWith(base)) {
- conf.set(key.replace(base, ""), entry.getValue());
- } else {
- conf.set(key, entry.getValue());
+ return getBasicIntermediateStageConfInternal(baseConf,
+ getPropertyNameForIntermediateStage(i, ""), false, false);
+ }
+
+ // FIXME small perf hit. Change this to parse through all keys once and
+ // generate objects per
+ // stage instead of scanning through conf multiple times.
+ public static Configuration getAndRemoveBasicNonIntermediateStageConf(
+ Configuration baseConf) {
+ Configuration newConf = new Configuration(false);
+ for (String key : DeprecatedKeys.getMRToEngineParamMap().keySet()) {
+ if (baseConf.get(key) != null) {
+ newConf.set(key, baseConf.get(key));
+ baseConf.unset(key);
}
}
- return conf;
- }
- // TODO MRR FIXME based on conf format.
- // Returns config settings specific to stage-i only.
- public static Configuration getBasicIntermediateStageConf(
- Configuration baseConf, int i) {
- String base = getPropertyNameForStage(i, "");
- Configuration conf = new Configuration(false);
- Iterator<Entry<String, String>> confEntries = baseConf.iterator();
- while (confEntries.hasNext()) {
- Entry<String, String> entry = confEntries.next();
- String key = entry.getKey();
- if (key.startsWith(base)) {
- conf.set(key.replace(base, ""), entry.getValue());
+ for (String key : DeprecatedKeys.getMultiStageParamMap().keySet()) {
+ if (baseConf.get(key) != null) {
+ newConf.set(key, baseConf.get(key));
+ baseConf.unset(key);
}
}
- return conf;
+ return newConf;
}
// TODO MRR FIXME based on conf format.
@@ -71,40 +78,70 @@ public class MultiStageMRConfigUtil {
}
// TODO MRR FIXME based on conf format.
- public static String getPropertyNameForStage(int intermediateStage,
- String originalPropertyName) {
+ // Intermediate stage numbers should start from 1.
+ public static String getPropertyNameForIntermediateStage(
+ int intermediateStage, String originalPropertyName) {
return MRJobConfig.MRR_INTERMEDIATE_STAGE_PREFIX + intermediateStage + "."
+ originalPropertyName;
}
+
+ //////////////////////////////////////////////////////////////////////////////
+ // Methods based on Vertex Name //
+ //////////////////////////////////////////////////////////////////////////////
+
+ private static final String INITIAL_MAP_VERTEX_NAME = "initialmap";
+ private static final String FINAL_REDUCE_VERTEX_NAME = "finalreduce";
+ private static final String INTERMEDIATE_TASK_VERTEX_NAME_PREFIX = "ivertex";
- public static void main(String[] args) {
- Configuration baseConf = new Configuration();
- baseConf.setInt(MRJobConfig.MRR_INTERMEDIATE_STAGES, 1);
- baseConf.setClass(MultiStageMRConfigUtil.getPropertyNameForStage(1,
- "mapreduce.job.combine.class"), IntSumReducer.class, Reducer.class);
- baseConf.setClass(MultiStageMRConfigUtil.getPropertyNameForStage(1,
- "mapreduce.job.reduce.class"), IntSumReducer.class, Reducer.class);
+ public static String getInitialMapVertexName() {
+ return INITIAL_MAP_VERTEX_NAME;
+ }
- Configuration conf = getBasicIntermediateStageConf(baseConf, 1);
- printConf(conf);
+ public static String getFinalReduceVertexName() {
+ return FINAL_REDUCE_VERTEX_NAME;
}
-
- private static String IREDUCE_PREFIX = "ireduce";
-
- public static String getIntermediateReduceVertexName(int i) {
- return "ireduce" + i;
+
+ public static String getIntermediateStageVertexName(int stageNum) {
+ return INTERMEDIATE_TASK_VERTEX_NAME_PREFIX + stageNum;
}
- public static boolean isIntermediateReduceStage(String vertexName) {
- return vertexName.startsWith(IREDUCE_PREFIX);
+ // Returns config settings specific to named vertex
+ public static Configuration getBasicConfForVertex(Configuration baseConf,
+ String vertexName) {
+ return getBasicIntermediateStageConfInternal(baseConf,
+ getPropertyNameForVertex(vertexName, ""), false, true);
}
-
- public static int getIntermediateReduceStageNum(String vertexName) {
- Preconditions.checkArgument(vertexName.startsWith(IREDUCE_PREFIX),
- "IntermediateReduce vertex name must start with prefix: "
- + IREDUCE_PREFIX);
- String stageNumString = vertexName.substring(IREDUCE_PREFIX.length());
- return Integer.valueOf(stageNumString);
+
+ // Returns and removes config settings specific to named vertex
+ public static Configuration getAndRemoveBasicConfForVertex(
+ Configuration baseConf, String vertexName) {
+ return getBasicIntermediateStageConfInternal(baseConf,
+ getPropertyNameForVertex(vertexName, ""), true, true);
+ }
+
+ // Returns a config with all parameters, and vertex specific params moved to
+ // the top level.
+ public static Configuration getConfForVertex(Configuration baseConf,
+ String vertexName) {
+ return getBasicIntermediateStageConfInternal(baseConf,
+ getPropertyNameForVertex(vertexName, ""), false, false);
+ }
+
+ public static void addConfigurationForVertex(Configuration baseConf,
+ String vertexName, Configuration vertexConf) {
+ Iterator<Entry<String, String>> confEntries = vertexConf.iterator();
+ while (confEntries.hasNext()) {
+ Entry<String, String> entry = confEntries.next();
+ baseConf.set(getPropertyNameForVertex(vertexName, entry.getKey()),
+ entry.getValue());
+ }
+ }
+
+ // TODO This is TezEngineLand
+ public static String getPropertyNameForVertex(String vertexName,
+ String originalPropertyName) {
+ return MRJobConfig.MRR_VERTEX_PREFIX + vertexName + "."
+ + originalPropertyName;
}
// TODO Get rid of this. Temporary for testing.
@@ -117,4 +154,27 @@ public class MultiStageMRConfigUtil {
System.err.println("Key: " + key + ", Value: " + value);
}
}
+
+ // TODO MRR FIXME based on conf format.
+ private static Configuration getBasicIntermediateStageConfInternal(
+ Configuration baseConf, String prefix, boolean remove, boolean stageOnly) {
+ Configuration conf = new Configuration(false);
+ Iterator<Entry<String, String>> confEntries = baseConf.iterator();
+ while (confEntries.hasNext()) {
+ Entry<String, String> entry = confEntries.next();
+ String key = entry.getKey();
+ if (key.startsWith(prefix)) {
+ conf.set(key.replace(prefix, ""), entry.getValue());
+ if (remove) {
+ baseConf.unset(key);
+ }
+ } else if (!stageOnly) {
+ conf.set(key, entry.getValue());
+ }
+ }
+ return conf;
+ }
+
+
+
}
Modified: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java?rev=1471779&r1=1471778&r2=1471779&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java (original)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java Wed Apr 24 23:34:26 2013
@@ -62,6 +62,7 @@ import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
import org.apache.hadoop.mapreduce.QueueAclsInfo;
import org.apache.hadoop.mapreduce.QueueInfo;
@@ -510,8 +511,9 @@ public class YARNRunner implements Clien
int stageNum = iReduceIndex + 1;
Configuration conf = MultiStageMRConfigUtil.getIntermediateStageConf(jobConf, stageNum);
int numTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0);
+ // Intermediate vertices start at 1.
Vertex vertex = new Vertex(
- MultiStageMRConfigUtil.getIntermediateReduceVertexName(stageNum),
+ MultiStageMRConfigUtil.getIntermediateStageVertexName(stageNum),
"org.apache.tez.mapreduce.task.IntermediateTask", numTasks);
Map<String, String> reduceEnv = new HashMap<String, String>();
@@ -575,7 +577,7 @@ public class YARNRunner implements Clien
String mapProcessor = mapOnly ?
"org.apache.tez.mapreduce.task.MapOnlyTask"
: "org.apache.tez.mapreduce.task.InitialTask";
- Vertex mapVertex = new Vertex("map", mapProcessor, numMaps);
+ Vertex mapVertex = new Vertex(MultiStageMRConfigUtil.getInitialMapVertexName(), mapProcessor, numMaps);
// FIXME set up map environment
Map<String, String> mapEnv = new HashMap<String, String>();
@@ -616,7 +618,7 @@ public class YARNRunner implements Clien
if (numReduces > 0) {
String reduceProcessor =
"org.apache.tez.mapreduce.task.FinalTask";
- Vertex reduceVertex = new Vertex("reduce", reduceProcessor, numReduces);
+ Vertex reduceVertex = new Vertex(MultiStageMRConfigUtil.getFinalReduceVertexName(), reduceProcessor, numReduces);
// FIXME set up reduce environment
Map<String, String> reduceEnv = new HashMap<String, String>();
@@ -825,6 +827,19 @@ public class YARNRunner implements Clien
return appContext;
}
+ private void writeTezConf(String jobSubmitDir, FileSystem fs,
+ Configuration tezConf) throws IOException {
+ Path dagConfFilePath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
+
+ FSDataOutputStream tezConfOut = FileSystem.create(fs, dagConfFilePath,
+ new FsPermission(DAG_FILE_PERMISSION));
+ try {
+ tezConf.writeXml(tezConfOut);
+ } finally {
+ tezConfOut.close();
+ }
+ }
+
@Override
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
@@ -840,14 +855,18 @@ public class YARNRunner implements Clien
FileSystem fs = FileSystem.get(conf);
JobConf jobConf = new JobConf(new TezConfiguration(conf));
+ Configuration tezJobConf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
+
+ // This will replace job.xml in the staging dir.
+ writeTezConf(jobSubmitDir, fs, tezJobConf);
// FIXME set up job resources
Map<String, LocalResource> jobLocalResources =
- createJobLocalResources(jobConf, jobSubmitDir);
+ createJobLocalResources(tezJobConf, jobSubmitDir);
DAG dag = createDAG(fs, jobId, jobConf, jobSubmitDir, ts,
jobLocalResources);
ApplicationSubmissionContext appContext =
- createApplicationSubmissionContext(fs, dag, jobConf, jobSubmitDir, ts,
+ createApplicationSubmissionContext(fs, dag, tezJobConf, jobSubmitDir, ts,
jobLocalResources);
// Submit to ResourceManager