You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/08/14 00:19:48 UTC

[2/2] git commit: TEZ-1347. Consolidate MRHelpers. (sseth)

TEZ-1347. Consolidate MRHelpers. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0d5fdf3d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0d5fdf3d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0d5fdf3d

Branch: refs/heads/master
Commit: 0d5fdf3dec8c22ba642eaa10b760fa8aced1c74e
Parents: 0f2abb1
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Aug 13 15:19:29 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Aug 13 15:19:29 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/client/TezClientUtils.java   |   1 +
 .../org/apache/tez/common/TezCommonUtils.java   |   1 +
 .../java/org/apache/tez/common/TezUtils.java    | 151 ++++++++
 .../api/events/InputInitializerEvent.java       |   3 +
 .../java/org/apache/tez/common/TezUtils.java    | 320 -----------------
 .../org/apache/tez/common/TezUtilsInternal.java | 224 ++++++++++++
 .../org/apache/tez/common/TestTezUtils.java     |  34 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   6 +-
 .../impl/SimpleHistoryLoggingService.java       |   4 +-
 .../org/apache/tez/runtime/task/TezChild.java   |   8 +-
 .../mapreduce/examples/FilterLinesByWord.java   |   8 +-
 .../examples/FilterLinesByWordOneToOne.java     |   8 +-
 .../tez/mapreduce/examples/MRRSleepJob.java     |  19 +-
 .../examples/TestOrderedWordCount.java          |  19 +-
 .../processor/FilterByWordInputProcessor.java   |   1 -
 .../apache/tez/mapreduce/client/YARNRunner.java |  17 +-
 .../mapreduce/committer/MROutputCommitter.java  |   4 +-
 .../common/MRInputAMSplitGenerator.java         |   5 +-
 .../common/MRInputSplitDistributor.java         |   4 +-
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  | 356 +++++++------------
 .../org/apache/tez/mapreduce/input/MRInput.java |  12 +-
 .../tez/mapreduce/input/base/MRInputBase.java   |   6 +-
 .../apache/tez/mapreduce/output/MROutput.java   |   4 +-
 .../apache/tez/mapreduce/processor/MRTask.java  |   2 +-
 .../common/TestMRInputSplitDistributor.java     |   6 +-
 .../hadoop/TestConfigTranslationMRToTez.java    |   4 +-
 .../mapreduce/hadoop/TestDeprecatedKeys.java    |   4 +-
 .../tez/mapreduce/hadoop/TestMRHelpers.java     |  29 +-
 .../tez/mapreduce/processor/MapUtils.java       |   6 +-
 .../processor/map/TestMapProcessor.java         |   4 +-
 .../processor/reduce/TestReduceProcessor.java   |   7 +-
 .../runtime/api/impl/TezCountersDelegate.java   |   6 +-
 .../library/common/shuffle/impl/Fetcher.java    |   5 +-
 .../common/shuffle/impl/MergeManager.java       |  11 +-
 .../library/common/shuffle/impl/Shuffle.java    |   4 +-
 .../shuffle/impl/ShuffleInputEventHandler.java  |   4 +-
 .../common/shuffle/impl/ShuffleScheduler.java   |   4 +-
 .../common/sort/impl/PipelinedSorter.java       |   5 +-
 .../common/sort/impl/dflt/DefaultSorter.java    |   4 +-
 .../writers/UnorderedPartitionedKVWriter.java   |   8 +-
 .../runtime/library/input/LocalMergedInput.java |   1 -
 .../library/input/ShuffledMergedInput.java      |   2 +-
 .../library/input/ShuffledUnorderedKVInput.java |   3 +-
 .../library/output/OnFileSortedOutput.java      |   6 +-
 .../library/output/OnFileUnorderedKVOutput.java |   5 +-
 .../OnFileUnorderedPartitionedKVOutput.java     |   2 +-
 .../impl/ShuffleInputEventHandlerImpl.java      |   4 +-
 .../shuffle/common/impl/ShuffleManager.java     |   4 +-
 .../impl/TestShuffleInputEventHandler.java      |   4 +-
 .../TestUnorderedPartitionedKVWriter.java       |   6 +-
 .../library/output/TestOnFileSortedOutput.java  |   2 +-
 .../impl/TestShuffleInputEventHandlerImpl.java  |   5 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |  22 +-
 .../tez/test/dag/SixLevelsFailingDAG.java       |   1 -
 .../tez/test/dag/ThreeLevelsFailingDAG.java     |   2 -
 56 files changed, 674 insertions(+), 724 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 05dd880..ec78b85 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -54,6 +54,7 @@ INCOMPATIBLE CHANGES
   TEZ-1237. Consolidate naming of API classes
   TEZ-1407. Move MRInput related methods out of MRHelpers and consolidate.
   TEZ-1194. Make TezUserPayload user facing for payload specification
+  TEZ-1347. Consolidate MRHelpers.
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 52e5027..8827108 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -100,6 +100,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 
+@Private
 public class TezClientUtils {
 
   private static Log LOG = LogFactory.getLog(TezClientUtils.class);

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index c15324c..edb8e8e 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -38,6 +38,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
 
 import com.google.protobuf.ByteString;
 
+@Private
 public class TezCommonUtils {
   public static final FsPermission TEZ_AM_DIR_PERMISSION = FsPermission
       .createImmutable((short) 0700); // rwx--------

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
new file mode 100644
index 0000000..f24dc11
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.records.DAGProtos;
+
+
+/**
+ * Utility methods for setting up a DAG. Has helpers for setting up log4j configuration, converting
+ * {@link org.apache.hadoop.conf.Configuration} to {@link org.apache.tez.dag.api.UserPayload} etc.
+ */
+@InterfaceAudience.Public
+public class TezUtils {
+
+
+  /**
+   * Allows changing the log level for task / AM logging. </p>
+   *
+   * Adds the JVM system properties necessary to configure
+   * {@link org.apache.hadoop.yarn.ContainerLogAppender}.
+   *
+   * @param logLevel the desired log level (eg INFO/WARN/DEBUG)
+   * @param vargs    the argument list to append to
+   */
+  public static void addLog4jSystemProperties(String logLevel,
+                                              List<String> vargs) {
+    TezClientUtils.addLog4jSystemProperties(logLevel, vargs);
+  }
+
+
+  /**
+   * Convert a Configuration to compressed ByteString using Protocol buffer
+   *
+   * @param conf
+   *          : Configuration to be converted
+   * @return PB ByteString (compressed)
+   * @throws java.io.IOException
+   */
+  public static ByteString createByteStringFromConf(Configuration conf) throws IOException {
+    Preconditions.checkNotNull(conf, "Configuration must be specified");
+    ByteString.Output os = ByteString.newOutput();
+    DeflaterOutputStream compressOs = new DeflaterOutputStream(os,
+        new Deflater(Deflater.BEST_SPEED));
+    try {
+      writeConfInPB(compressOs, conf);
+    } finally {
+      if (compressOs != null) {
+        compressOs.close();
+      }
+    }
+    return os.toByteString();
+  }
+
+  /**
+   * Convert a Configuration to a {@link org.apache.tez.dag.api.UserPayload} </p>
+   *
+   *
+   * @param conf configuration to be converted
+   * @return an instance of {@link org.apache.tez.dag.api.UserPayload}
+   * @throws java.io.IOException
+   */
+  public static UserPayload createUserPayloadFromConf(Configuration conf) throws IOException {
+    return new UserPayload(createByteStringFromConf(conf).toByteArray());
+  }
+
+  /**
+   * Convert a byte string to a Configuration object
+   *
+   * @param byteString byteString representation of the conf created using {@link
+   *                   #createByteStringFromConf(org.apache.hadoop.conf.Configuration)}
+   * @return Configuration
+   * @throws java.io.IOException
+   */
+  public static Configuration createConfFromByteString(ByteString byteString) throws IOException {
+    Preconditions.checkNotNull(byteString, "ByteString must be specified");
+    // SnappyInputStream uncompressIs = new
+    // SnappyInputStream(byteString.newInput());
+    InflaterInputStream uncompressIs = new InflaterInputStream(byteString.newInput());
+    DAGProtos.ConfigurationProto confProto = DAGProtos.ConfigurationProto.parseFrom(uncompressIs);
+    Configuration conf = new Configuration(false);
+    readConfFromPB(confProto, conf);
+    return conf;
+  }
+
+  /**
+   * Convert an instance of {@link org.apache.tez.dag.api.UserPayload} to {@link
+   * org.apache.hadoop.conf.Configuration}
+   *
+   * @param payload {@link org.apache.tez.dag.api.UserPayload} created using {@link
+   *                #createUserPayloadFromConf(org.apache.hadoop.conf.Configuration)}
+   * @return Configuration
+   * @throws java.io.IOException
+   */
+  public static Configuration createConfFromUserPayload(UserPayload payload) throws IOException {
+    return createConfFromByteString(ByteString.copyFrom(payload.getPayload()));
+  }
+
+
+  private static void writeConfInPB(OutputStream dos, Configuration conf) throws IOException {
+    DAGProtos.ConfigurationProto.Builder confProtoBuilder = DAGProtos.ConfigurationProto
+        .newBuilder();
+    Iterator<Map.Entry<String, String>> iter = conf.iterator();
+    while (iter.hasNext()) {
+      Map.Entry<String, String> entry = iter.next();
+      DAGProtos.PlanKeyValuePair.Builder kvp = DAGProtos.PlanKeyValuePair.newBuilder();
+      kvp.setKey(entry.getKey());
+      kvp.setValue(entry.getValue());
+      confProtoBuilder.addConfKeyValues(kvp);
+    }
+    DAGProtos.ConfigurationProto confProto = confProtoBuilder.build();
+    confProto.writeTo(dos);
+  }
+
+  private static void readConfFromPB(DAGProtos.ConfigurationProto confProto, Configuration conf) {
+    List<DAGProtos.PlanKeyValuePair> settingList = confProto.getConfKeyValuesList();
+    for (DAGProtos.PlanKeyValuePair setting : settingList) {
+      conf.set(setting.getKey(), setting.getValue());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
index 87198c1..3d1ed11 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
@@ -20,6 +20,7 @@
 
 package org.apache.tez.runtime.api.events;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.tez.runtime.api.InputInitializer;
 import org.apache.tez.runtime.api.Event;
@@ -46,6 +47,8 @@ public class InputInitializerEvent extends Event {
    */
   public InputInitializerEvent(String targetVertexName, String targetInputName,
                                    byte[] eventPayload, int version) {
+    Preconditions.checkNotNull(targetVertexName, "TargetVertexName cannot be null");
+    Preconditions.checkNotNull(targetInputName, "TargetInputName cannot be null");
     this.targetVertexName = targetVertexName;
     this.targetInputName = targetInputName;
     this.version = version;

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
deleted file mode 100644
index 3b3c7a7..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.common;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.zip.DataFormatException;
-import java.util.zip.Deflater;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.Inflater;
-import java.util.zip.InflaterInputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.log4j.Appender;
-import org.apache.log4j.Logger;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
-import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.protobuf.ByteString;
-
-public class TezUtils {
-
-  private static final Log LOG = LogFactory.getLog(TezUtils.class);
-  private static final Random RANDOM = new Random();
-
-  public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws
-      IOException {
-    FileInputStream confPBBinaryStream = null;
-    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
-    try {
-      confPBBinaryStream =
-          new FileInputStream(new File(baseDir, TezConfiguration.TEZ_PB_BINARY_CONF_NAME));
-      confProtoBuilder.mergeFrom(confPBBinaryStream);
-    } finally {
-      if (confPBBinaryStream != null) {
-        confPBBinaryStream.close();
-      }
-    }
-
-    ConfigurationProto confProto = confProtoBuilder.build();
-
-    List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
-    if (kvPairList != null && !kvPairList.isEmpty()) {
-      for (PlanKeyValuePair kvPair : kvPairList) {
-        conf.set(kvPair.getKey(), kvPair.getValue());
-      }
-    }
-  }
-
-  /**
-   * Convert a Configuration to compressed ByteString using Protocol buffer
-   * 
-   * @param conf
-   *          : Configuration to be converted
-   * @return PB ByteString (compressed)
-   * @throws IOException
-   */
-  public static ByteString createByteStringFromConf(Configuration conf) throws IOException {
-    Preconditions.checkNotNull(conf, "Configuration must be specified");
-    ByteString.Output os = ByteString.newOutput();
-    DeflaterOutputStream compressOs = new DeflaterOutputStream(os,
-        new Deflater(Deflater.BEST_SPEED));
-    try {
-      writeConfInPB(compressOs, conf);
-    } finally {
-      if (compressOs != null) {
-        compressOs.close();
-      }
-    }
-    return os.toByteString();
-  }
-
-  /**
-   * Convert a Configuration to compressed user pay load (i.e. byte[]) using
-   * Protocol buffer
-   * 
-   * @param conf
-   *          : Configuration to be converted
-   * @return compressed pay load
-   * @throws IOException
-   */
-  public static UserPayload createUserPayloadFromConf(Configuration conf) throws IOException {
-    return new UserPayload(createByteStringFromConf(conf).toByteArray());
-  }
-
-  /**
-   * Convert compressed byte string to a Configuration object using protocol
-   * buffer
-   * 
-   * @param byteString
-   *          :compressed conf in Protocol buffer
-   * @return Configuration
-   * @throws IOException
-   */
-  public static Configuration createConfFromByteString(ByteString byteString) throws IOException {
-    Preconditions.checkNotNull(byteString, "ByteString must be specified");
-    // SnappyInputStream uncompressIs = new
-    // SnappyInputStream(byteString.newInput());
-    InflaterInputStream uncompressIs = new InflaterInputStream(byteString.newInput());
-    ConfigurationProto confProto = ConfigurationProto.parseFrom(uncompressIs);
-    Configuration conf = new Configuration(false);
-    readConfFromPB(confProto, conf);
-    return conf;
-  }
-
-  /**
-   * Convert compressed pay load in byte[] to a Configuration object using
-   * protocol buffer
-   * 
-   * @param payload
-   *          : compressed pay load
-   * @return Configuration
-   * @throws IOException
-   */
-  public static Configuration createConfFromUserPayload(UserPayload payload) throws IOException {
-    return createConfFromByteString(ByteString.copyFrom(payload.getPayload()));
-  }
-
-  private static void writeConfInPB(OutputStream dos, Configuration conf) throws IOException {
-    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
-    Iterator<Entry<String, String>> iter = conf.iterator();
-    while (iter.hasNext()) {
-      Entry<String, String> entry = iter.next();
-      PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
-      kvp.setKey(entry.getKey());
-      kvp.setValue(entry.getValue());
-      confProtoBuilder.addConfKeyValues(kvp);
-    }
-    ConfigurationProto confProto = confProtoBuilder.build();
-    confProto.writeTo(dos);
-  }
-
-  private static void readConfFromPB(ConfigurationProto confProto, Configuration conf) {
-    List<PlanKeyValuePair> settingList = confProto.getConfKeyValuesList();
-    for (PlanKeyValuePair setting : settingList) {
-      conf.set(setting.getKey(), setting.getValue());
-    }
-  }
-
-  public static byte[] compressBytes(byte[] inBytes) throws IOException {
-    Stopwatch sw = null;
-    if (LOG.isDebugEnabled()) {
-      sw = new Stopwatch().start();
-    }
-    byte[] compressed = compressBytesInflateDeflate(inBytes);
-    if (LOG.isDebugEnabled()) {
-      sw.stop();
-      LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: " + compressed.length
-          + ", CompressTime: " + sw.elapsedMillis());
-    }
-    return compressed;
-  }
-
-  public static byte[] uncompressBytes(byte[] inBytes) throws IOException {
-    Stopwatch sw = null;
-    if (LOG.isDebugEnabled()) {
-      sw = new Stopwatch().start();
-    }
-    byte[] uncompressed = uncompressBytesInflateDeflate(inBytes);
-    if (LOG.isDebugEnabled()) {
-      sw.stop();
-      LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: " + uncompressed.length
-          + ", UncompressTimeTaken: " + sw.elapsedMillis());
-    }
-    return uncompressed;
-  }
-
-  private static byte[] compressBytesInflateDeflate(byte[] inBytes) {
-    Deflater deflater = new Deflater(Deflater.BEST_SPEED);
-    deflater.setInput(inBytes);
-    ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length);
-    deflater.finish();
-    byte[] buffer = new byte[1024 * 8];
-    while (!deflater.finished()) {
-      int count = deflater.deflate(buffer);
-      bos.write(buffer, 0, count);
-    }
-    byte[] output = bos.toByteArray();
-    return output;
-  }
-
-  private static byte[] uncompressBytesInflateDeflate(byte[] inBytes) throws IOException {
-    Inflater inflater = new Inflater();
-    inflater.setInput(inBytes);
-    ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length);
-    byte[] buffer = new byte[1024 * 8];
-    while (!inflater.finished()) {
-      int count;
-      try {
-        count = inflater.inflate(buffer);
-      } catch (DataFormatException e) {
-        throw new IOException(e);
-      }
-      bos.write(buffer, 0, count);
-    }
-    byte[] output = bos.toByteArray();
-    return output;
-  }
-
-  private static final Pattern pattern = Pattern.compile("\\W");
-  @Private
-  public static final int MAX_VERTEX_NAME_LENGTH = 40;
-
-  @Private
-  public static String cleanVertexName(String vertexName) {
-    return sanitizeString(vertexName).substring(0,
-        vertexName.length() > MAX_VERTEX_NAME_LENGTH ? MAX_VERTEX_NAME_LENGTH : vertexName.length());
-  }
-
-  private static String sanitizeString(String srcString) {
-    Matcher matcher = pattern.matcher(srcString);
-    String res = matcher.replaceAll("_");
-    return res; // Number starts allowed rightnow
-  }
-
-  public static void updateLoggers(String addend) throws FileNotFoundException {
-
-    LOG.info("Redirecting log file based on addend: " + addend);
-
-    Appender appender = Logger.getRootLogger().getAppender(
-        TezConfiguration.TEZ_CONTAINER_LOGGER_NAME);
-    if (appender != null) {
-      if (appender instanceof TezContainerLogAppender) {
-        TezContainerLogAppender claAppender = (TezContainerLogAppender) appender;
-        claAppender.setLogFileName(constructLogFileName(
-            TezConfiguration.TEZ_CONTAINER_LOG_FILE_NAME, addend));
-        claAppender.activateOptions();
-      } else {
-        LOG.warn("Appender is a " + appender.getClass() + "; require an instance of "
-            + TezContainerLogAppender.class.getName() + " to reconfigure the logger output");
-      }
-    } else {
-      LOG.warn("Not configured with appender named: " + TezConfiguration.TEZ_CONTAINER_LOGGER_NAME
-          + ". Cannot reconfigure logger output");
-    }
-  }
-
-  private static String constructLogFileName(String base, String addend) {
-    if (addend == null || addend.isEmpty()) {
-      return base;
-    } else {
-      return base + "_" + addend;
-    }
-  }
-
-  public static BitSet fromByteArray(byte[] bytes) {
-    if (bytes == null) {
-      return new BitSet();
-    }
-    BitSet bits = new BitSet();
-    for (int i = 0; i < bytes.length * 8; i++) {
-      if ((bytes[(bytes.length) - (i / 8) - 1] & (1 << (i % 8))) > 0) {
-        bits.set(i);
-      }
-    }
-    return bits;
-  }
-
-  public static byte[] toByteArray(BitSet bits) {
-    if (bits == null) {
-      return null;
-    }
-    byte[] bytes = new byte[bits.length() / 8 + 1];
-    for (int i = 0; i < bits.length(); i++) {
-      if (bits.get(i)) {
-        bytes[(bytes.length) - (i / 8) - 1] |= 1 << (i % 8);
-      }
-    }
-    return bytes;
-  }
-
-  public static String getContainerLogDir() {
-    String logDirsStr  = System.getenv(Environment.LOG_DIRS.name());
-    if (logDirsStr == null || logDirsStr.isEmpty()) {
-      return null;
-    }
-    String[] logDirs = StringUtils.split(logDirsStr, ',');
-    if (logDirs.length == 0) {
-      return null;
-    }
-    int logIndex = RANDOM.nextInt(logDirs.length);
-    return logDirs[logIndex];
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
new file mode 100644
index 0000000..05feae7
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Logger;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+
+import com.google.common.base.Stopwatch;
+
+@Private
+public class TezUtilsInternal {
+
+  private static final Log LOG = LogFactory.getLog(TezUtilsInternal.class);
+  private static final Random RANDOM = new Random();
+
+  public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws
+      IOException {
+    FileInputStream confPBBinaryStream = null;
+    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
+    try {
+      confPBBinaryStream =
+          new FileInputStream(new File(baseDir, TezConfiguration.TEZ_PB_BINARY_CONF_NAME));
+      confProtoBuilder.mergeFrom(confPBBinaryStream);
+    } finally {
+      if (confPBBinaryStream != null) {
+        confPBBinaryStream.close();
+      }
+    }
+
+    ConfigurationProto confProto = confProtoBuilder.build();
+
+    List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
+    if (kvPairList != null && !kvPairList.isEmpty()) {
+      for (PlanKeyValuePair kvPair : kvPairList) {
+        conf.set(kvPair.getKey(), kvPair.getValue());
+      }
+    }
+  }
+
+
+  public static byte[] compressBytes(byte[] inBytes) throws IOException {
+    Stopwatch sw = null;
+    if (LOG.isDebugEnabled()) {
+      sw = new Stopwatch().start();
+    }
+    byte[] compressed = compressBytesInflateDeflate(inBytes);
+    if (LOG.isDebugEnabled()) {
+      sw.stop();
+      LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: " + compressed.length
+          + ", CompressTime: " + sw.elapsedMillis());
+    }
+    return compressed;
+  }
+
+  public static byte[] uncompressBytes(byte[] inBytes) throws IOException {
+    Stopwatch sw = null;
+    if (LOG.isDebugEnabled()) {
+      sw = new Stopwatch().start();
+    }
+    byte[] uncompressed = uncompressBytesInflateDeflate(inBytes);
+    if (LOG.isDebugEnabled()) {
+      sw.stop();
+      LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: " + uncompressed.length
+          + ", UncompressTimeTaken: " + sw.elapsedMillis());
+    }
+    return uncompressed;
+  }
+
+  private static byte[] compressBytesInflateDeflate(byte[] inBytes) {
+    Deflater deflater = new Deflater(Deflater.BEST_SPEED);
+    deflater.setInput(inBytes);
+    ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length);
+    deflater.finish();
+    byte[] buffer = new byte[1024 * 8];
+    while (!deflater.finished()) {
+      int count = deflater.deflate(buffer);
+      bos.write(buffer, 0, count);
+    }
+    byte[] output = bos.toByteArray();
+    return output;
+  }
+
+  private static byte[] uncompressBytesInflateDeflate(byte[] inBytes) throws IOException {
+    Inflater inflater = new Inflater();
+    inflater.setInput(inBytes);
+    ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length);
+    byte[] buffer = new byte[1024 * 8];
+    while (!inflater.finished()) {
+      int count;
+      try {
+        count = inflater.inflate(buffer);
+      } catch (DataFormatException e) {
+        throw new IOException(e);
+      }
+      bos.write(buffer, 0, count);
+    }
+    byte[] output = bos.toByteArray();
+    return output;
+  }
+
+  private static final Pattern pattern = Pattern.compile("\\W");
+  @Private
+  public static final int MAX_VERTEX_NAME_LENGTH = 40;
+
+  @Private
+  public static String cleanVertexName(String vertexName) {
+    return sanitizeString(vertexName).substring(0,
+        vertexName.length() > MAX_VERTEX_NAME_LENGTH ? MAX_VERTEX_NAME_LENGTH : vertexName.length());
+  }
+
+  private static String sanitizeString(String srcString) {
+    Matcher matcher = pattern.matcher(srcString);
+    String res = matcher.replaceAll("_");
+    return res; // Number starts allowed rightnow
+  }
+
+  public static void updateLoggers(String addend) throws FileNotFoundException {
+
+    LOG.info("Redirecting log file based on addend: " + addend);
+
+    Appender appender = Logger.getRootLogger().getAppender(
+        TezConfiguration.TEZ_CONTAINER_LOGGER_NAME);
+    if (appender != null) {
+      if (appender instanceof TezContainerLogAppender) {
+        TezContainerLogAppender claAppender = (TezContainerLogAppender) appender;
+        claAppender.setLogFileName(constructLogFileName(
+            TezConfiguration.TEZ_CONTAINER_LOG_FILE_NAME, addend));
+        claAppender.activateOptions();
+      } else {
+        LOG.warn("Appender is a " + appender.getClass() + "; require an instance of "
+            + TezContainerLogAppender.class.getName() + " to reconfigure the logger output");
+      }
+    } else {
+      LOG.warn("Not configured with appender named: " + TezConfiguration.TEZ_CONTAINER_LOGGER_NAME
+          + ". Cannot reconfigure logger output");
+    }
+  }
+
+  private static String constructLogFileName(String base, String addend) {
+    if (addend == null || addend.isEmpty()) {
+      return base;
+    } else {
+      return base + "_" + addend;
+    }
+  }
+
+  public static BitSet fromByteArray(byte[] bytes) {
+    if (bytes == null) {
+      return new BitSet();
+    }
+    BitSet bits = new BitSet();
+    for (int i = 0; i < bytes.length * 8; i++) {
+      if ((bytes[(bytes.length) - (i / 8) - 1] & (1 << (i % 8))) > 0) {
+        bits.set(i);
+      }
+    }
+    return bits;
+  }
+
+  public static byte[] toByteArray(BitSet bits) {
+    if (bits == null) {
+      return null;
+    }
+    byte[] bytes = new byte[bits.length() / 8 + 1];
+    for (int i = 0; i < bits.length(); i++) {
+      if (bits.get(i)) {
+        bytes[(bytes.length) - (i / 8) - 1] |= 1 << (i % 8);
+      }
+    }
+    return bytes;
+  }
+
+  public static String getContainerLogDir() {
+    String logDirsStr  = System.getenv(Environment.LOG_DIRS.name());
+    if (logDirsStr == null || logDirsStr.isEmpty()) {
+      return null;
+    }
+    String[] logDirs = StringUtils.split(logDirsStr, ',');
+    if (logDirs.length == 0) {
+      return null;
+    }
+    int logIndex = RANDOM.nextInt(logDirs.length);
+    return logDirs[logIndex];
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
index 852d6dc..e1036a5 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
@@ -56,10 +56,10 @@ public class TestTezUtils {
   @Test
   public void testCleanVertexName() {
     String testString = "special characters & spaces and longer than "
-        + TezUtils.MAX_VERTEX_NAME_LENGTH + " characters";
-    Assert.assertTrue(testString.length() > TezUtils.MAX_VERTEX_NAME_LENGTH);
-    String cleaned = TezUtils.cleanVertexName(testString);
-    Assert.assertTrue(cleaned.length() <= TezUtils.MAX_VERTEX_NAME_LENGTH);
+        + TezUtilsInternal.MAX_VERTEX_NAME_LENGTH + " characters";
+    Assert.assertTrue(testString.length() > TezUtilsInternal.MAX_VERTEX_NAME_LENGTH);
+    String cleaned = TezUtilsInternal.cleanVertexName(testString);
+    Assert.assertTrue(cleaned.length() <= TezUtilsInternal.MAX_VERTEX_NAME_LENGTH);
     Assert.assertFalse(cleaned.contains("\\s+"));
     Assert.assertTrue(cleaned.matches("\\w+"));
   }
@@ -67,39 +67,39 @@ public class TestTezUtils {
   @Test
   public void testBitSetToByteArray() {
     BitSet bitSet = createBitSet(0);
-    byte[] bytes = TezUtils.toByteArray(bitSet);
+    byte[] bytes = TezUtilsInternal.toByteArray(bitSet);
     Assert.assertTrue(bytes.length == ((bitSet.length() / 8) + 1));
 
     bitSet = createBitSet(1000);
-    bytes = TezUtils.toByteArray(bitSet);
+    bytes = TezUtilsInternal.toByteArray(bitSet);
     Assert.assertTrue(bytes.length == ((bitSet.length() / 8) + 1));
   }
 
   @Test
   public void testBitSetFromByteArray() {
     BitSet bitSet = createBitSet(0);
-    byte[] bytes = TezUtils.toByteArray(bitSet);
-    Assert.assertEquals(TezUtils.fromByteArray(bytes).cardinality(), bitSet.cardinality());
-    Assert.assertTrue(TezUtils.fromByteArray(bytes).equals(bitSet));
+    byte[] bytes = TezUtilsInternal.toByteArray(bitSet);
+    Assert.assertEquals(TezUtilsInternal.fromByteArray(bytes).cardinality(), bitSet.cardinality());
+    Assert.assertTrue(TezUtilsInternal.fromByteArray(bytes).equals(bitSet));
 
     bitSet = createBitSet(1);
-    bytes = TezUtils.toByteArray(bitSet);
-    Assert.assertEquals(TezUtils.fromByteArray(bytes).cardinality(), bitSet.cardinality());
-    Assert.assertTrue(TezUtils.fromByteArray(bytes).equals(bitSet));
+    bytes = TezUtilsInternal.toByteArray(bitSet);
+    Assert.assertEquals(TezUtilsInternal.fromByteArray(bytes).cardinality(), bitSet.cardinality());
+    Assert.assertTrue(TezUtilsInternal.fromByteArray(bytes).equals(bitSet));
     
     bitSet = createBitSet(1000);
-    bytes = TezUtils.toByteArray(bitSet);
-    Assert.assertEquals(TezUtils.fromByteArray(bytes).cardinality(), bitSet.cardinality());
-    Assert.assertTrue(TezUtils.fromByteArray(bytes).equals(bitSet));
+    bytes = TezUtilsInternal.toByteArray(bitSet);
+    Assert.assertEquals(TezUtilsInternal.fromByteArray(bytes).cardinality(), bitSet.cardinality());
+    Assert.assertTrue(TezUtilsInternal.fromByteArray(bytes).equals(bitSet));
   }
 
   @Test
   public void testBitSetConversion() {
     for (int i = 0 ; i < 16 ; i++) {
       BitSet bitSet = createBitSetWithSingleEntry(i);
-      byte[] bytes = TezUtils.toByteArray(bitSet);
+      byte[] bytes = TezUtilsInternal.toByteArray(bitSet);
       
-      BitSet deseraialized = TezUtils.fromByteArray(bytes);
+      BitSet deseraialized = TezUtilsInternal.fromByteArray(bytes);
       Assert.assertEquals(bitSet, deseraialized);
       Assert.assertEquals(bitSet.cardinality(), deseraialized.cardinality());
       Assert.assertEquals(1, deseraialized.cardinality());

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 3757c62..e5143fb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -90,7 +90,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezConverterUtils;
-import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.impl.LogUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
@@ -546,7 +546,7 @@ public class DAGAppMaster extends AbstractService {
 
   private void _updateLoggers(DAG dag, String appender) {
     try {
-      TezUtils.updateLoggers(dag.getID().toString() + appender);
+      TezUtilsInternal.updateLoggers(dag.getID().toString() + appender);
     } catch (FileNotFoundException e) {
       LOG.warn("Unable to update the logger. Continue with the old logger", e );
     }
@@ -1866,7 +1866,7 @@ public class DAGAppMaster extends AbstractService {
       InterruptedException {
 
     final Configuration conf = new Configuration(new YarnConfiguration());
-    TezUtils.addUserSpecifiedTezConfiguration(appMaster.workingDirectory, conf);
+    TezUtilsInternal.addUserSpecifiedTezConfiguration(appMaster.workingDirectory, conf);
 
     // Do not automatically close FileSystem objects so that in case of
     // SIGTERM I have a chance to write out the job history. I'll be closing

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
index 8ef1920..1cbf516 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.logging.HistoryLoggingService;
@@ -62,7 +62,7 @@ public class SimpleHistoryLoggingService extends HistoryLoggingService {
     String logDirPath = conf.get(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR);
     final String logFileName = LOG_FILE_NAME_PREFIX + "." + appContext.getApplicationAttemptId();
     if (logDirPath == null || logDirPath.isEmpty()) {
-      String logDir = TezUtils.getContainerLogDir();
+      String logDir = TezUtilsInternal.getContainerLogDir();
       LOG.info("Log file location for SimpleHistoryLoggingService not specified, defaulting to"
           + " containerLogDir=" + logDir);
       Path p;

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
index d005dbd..1ff393b 100644
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -52,7 +52,7 @@ import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
 import org.apache.tez.common.TezLocalResource;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
@@ -175,7 +175,7 @@ public class TezChild {
 
     while (!executor.isTerminated()) {
       if (taskCount > 0) {
-        TezUtils.updateLoggers("");
+        TezUtilsInternal.updateLoggers("");
       }
       ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
       ContainerTask containerTask = null;
@@ -201,7 +201,7 @@ public class TezChild {
       } else {
         String loggerAddend = containerTask.getTaskSpec().getTaskAttemptID().toString();
         taskCount++;
-        TezUtils.updateLoggers(loggerAddend);
+        TezUtilsInternal.updateLoggers(loggerAddend);
         FileSystem.clearStatistics();
 
         childUGI = handleNewTaskCredentials(containerTask, childUGI);
@@ -389,7 +389,7 @@ public class TezChild {
     // Pull in configuration specified for the session.
     // TODO TEZ-1233. This needs to be moved over the wire rather than localizing the file
     // for each and every task, and reading it back from disk. Also needs to be per vertex.
-    TezUtils.addUserSpecifiedTezConfiguration(workingDirectory, conf);
+    TezUtilsInternal.addUserSpecifiedTezConfiguration(workingDirectory, conf);
     UserGroupInformation.setConfiguration(conf);
     Limits.setConfiguration(conf);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 3a081bb..4a894a3 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.DataSourceDescriptor;
@@ -69,7 +70,6 @@ import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
 import org.apache.tez.mapreduce.committer.MROutputCommitter;
 import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
@@ -167,7 +167,7 @@ public class FilterLinesByWord extends Configured implements Tool {
     stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
     stage2Conf.setBoolean("mapred.mapper.new-api", false);
 
-    UserPayload stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+    UserPayload stage1Payload = TezUtils.createUserPayloadFromConf(stage1Conf);
     // Setup stage1 Vertex
     Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
         FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload))
@@ -188,12 +188,12 @@ public class FilterLinesByWord extends Configured implements Tool {
     // Setup stage2 Vertex
     Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
         FilterByWordOutputProcessor.class.getName()).setUserPayload(
-        MRHelpers.createUserPayloadFromConf(stage2Conf)), 1);
+        TezUtils.createUserPayloadFromConf(stage2Conf)), 1);
     stage2Vertex.setTaskLocalFiles(commonLocalResources);
 
     // Configure the Output for stage2
     OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
-        .setUserPayload(MRHelpers.createUserPayloadFromConf(stage2Conf));
+        .setUserPayload(TezUtils.createUserPayloadFromConf(stage2Conf));
     OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName());
     stage2Vertex.addDataSink("MROutput", new DataSinkDescriptor(od, ocd, null));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index a77713f..63a762d 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.DataSourceDescriptor;
@@ -62,7 +63,6 @@ import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.mapreduce.committer.MROutputCommitter;
 import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
 import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
@@ -158,7 +158,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
     stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
     stage2Conf.setBoolean("mapred.mapper.new-api", false);
 
-    UserPayload stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+    UserPayload stage1Payload = TezUtils.createUserPayloadFromConf(stage1Conf);
     // Setup stage1 Vertex
     Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
         FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload))
@@ -178,7 +178,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
 
     // Setup stage2 Vertex
     Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
-        FilterByWordOutputProcessor.class.getName()).setUserPayload(MRHelpers
+        FilterByWordOutputProcessor.class.getName()).setUserPayload(TezUtils
         .createUserPayloadFromConf(stage2Conf)), dsd.getNumberOfShards());
     stage2Vertex.setTaskLocalFiles(commonLocalResources);
 
@@ -186,7 +186,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
     stage2Vertex.addDataSink(
         "MROutput",
         new DataSinkDescriptor(new OutputDescriptor(MROutput.class.getName())
-            .setUserPayload(MRHelpers.createUserPayloadFromConf(stage2Conf)),
+            .setUserPayload(TezUtils.createUserPayloadFromConf(stage2Conf)),
             new OutputCommitterDescriptor(MROutputCommitter.class.getName()), null));
 
     UnorderedUnpartitionedKVEdgeConfigurer edgeConf = UnorderedUnpartitionedKVEdgeConfigurer

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 0562633..b22a4a8 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
@@ -428,7 +429,7 @@ public class MRRSleepJob extends Configured implements Tool {
           NullOutputFormat.class.getName());
     }
 
-    MRHelpers.translateVertexConfToTez(mapStageConf);
+    MRHelpers.translateMRConfToTez(mapStageConf);
 
     Configuration[] intermediateReduceStageConfs = null;
     if (iReduceStagesCount > 0
@@ -449,7 +450,7 @@ public class MRRSleepJob extends Configured implements Tool {
             MRRSleepJobPartitioner.class.getName());
 
 
-        MRHelpers.translateVertexConfToTez(iReduceStageConf);
+        MRHelpers.translateMRConfToTez(iReduceStageConf);
         intermediateReduceStageConfs[i-1] = iReduceStageConf;
       }
     }
@@ -468,18 +469,18 @@ public class MRRSleepJob extends Configured implements Tool {
       finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
           NullOutputFormat.class.getName());
 
-      MRHelpers.translateVertexConfToTez(finalReduceConf);
+      MRHelpers.translateMRConfToTez(finalReduceConf);
     }
 
-    MRHelpers.doJobClientMagic(mapStageConf);
+    MRHelpers.configureMRApiUsage(mapStageConf);
     if (iReduceStagesCount > 0
         && numIReducer > 0) {
       for (int i = 0; i < iReduceStagesCount; ++i) {
-        MRHelpers.doJobClientMagic(intermediateReduceStageConfs[i]);
+        MRHelpers.configureMRApiUsage(intermediateReduceStageConfs[i]);
       }
     }
     if (numReducer > 0) {
-      MRHelpers.doJobClientMagic(finalReduceConf);
+      MRHelpers.configureMRApiUsage(finalReduceConf);
     }
 
     DataSourceDescriptor dataSource = null;
@@ -520,7 +521,7 @@ public class MRRSleepJob extends Configured implements Tool {
     List<Vertex> vertices = new ArrayList<Vertex>();
 
     
-    UserPayload mapUserPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
+    UserPayload mapUserPayload = TezUtils.createUserPayloadFromConf(mapStageConf);
     int numTasks = generateSplitsInAM ? -1 : numMapper;
 
     Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
@@ -534,7 +535,7 @@ public class MRRSleepJob extends Configured implements Tool {
       for (int i = 0; i < iReduceStagesCount; ++i) {
         Configuration iconf =
             intermediateReduceStageConfs[i];
-        UserPayload iReduceUserPayload = MRHelpers.createUserPayloadFromConf(iconf);
+        UserPayload iReduceUserPayload = TezUtils.createUserPayloadFromConf(iconf);
         Vertex ivertex = new Vertex("ireduce" + (i+1),
                 new ProcessorDescriptor(ReduceProcessor.class.getName()).
                 setUserPayload(iReduceUserPayload), numIReducer);
@@ -545,7 +546,7 @@ public class MRRSleepJob extends Configured implements Tool {
 
     Vertex finalReduceVertex = null;
     if (numReducer > 0) {
-      UserPayload reducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
+      UserPayload reducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
       finalReduceVertex = new Vertex("reduce", new ProcessorDescriptor(
           ReduceProcessor.class.getName()).setUserPayload(reducePayload), numReducer);
       finalReduceVertex.setTaskLocalFiles(commonLocalResources);

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index 255dcbd..72c14fc 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -155,7 +155,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
     mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
         TokenizerMapper.class.getName());
 
-    MRHelpers.translateVertexConfToTez(mapStageConf);
+    MRHelpers.translateMRConfToTez(mapStageConf);
 
     Configuration iReduceStageConf = new JobConf(conf);
     // TODO replace with auto-reduce parallelism
@@ -166,7 +166,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
     iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS,
         IntWritable.class.getName());
     iReduceStageConf.setBoolean("mapred.mapper.new-api", true);
-    MRHelpers.translateVertexConfToTez(iReduceStageConf);
+    MRHelpers.translateMRConfToTez(iReduceStageConf);
 
     Configuration finalReduceConf = new JobConf(conf);
     finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1);
@@ -174,11 +174,11 @@ public class TestOrderedWordCount extends Configured implements Tool {
         MyOrderByNoOpReducer.class.getName());
     finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
     finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
-    MRHelpers.translateVertexConfToTez(finalReduceConf);
+    MRHelpers.translateMRConfToTez(finalReduceConf);
 
-    MRHelpers.doJobClientMagic(mapStageConf);
-    MRHelpers.doJobClientMagic(iReduceStageConf);
-    MRHelpers.doJobClientMagic(finalReduceConf);
+    MRHelpers.configureMRApiUsage(mapStageConf);
+    MRHelpers.configureMRApiUsage(iReduceStageConf);
+    MRHelpers.configureMRApiUsage(finalReduceConf);
 
     List<Vertex> vertices = new ArrayList<Vertex>();
 
@@ -197,7 +197,8 @@ public class TestOrderedWordCount extends Configured implements Tool {
     }
 
     Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
-        MapProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(mapStageConf))
+        MapProcessor.class.getName()).setUserPayload(
+        TezUtils.createUserPayloadFromConf(mapStageConf))
         .setHistoryText(mapStageHistoryText)).setTaskLocalFiles(commonLocalResources);
     mapVertex.addDataSource("MRInput", dsd);
     vertices.add(mapVertex);
@@ -207,7 +208,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
     String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8");
     Vertex ivertex = new Vertex("intermediate_reducer", new ProcessorDescriptor(
         ReduceProcessor.class.getName())
-            .setUserPayload(MRHelpers.createUserPayloadFromConf(iReduceStageConf))
+            .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf))
             .setHistoryText(iReduceStageHistoryText), 2);
     ivertex.setTaskLocalFiles(commonLocalResources);
     vertices.add(ivertex);
@@ -215,7 +216,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
     ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
     finalReduceConf.writeXml(finalReduceOutputStream);
     String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
-    UserPayload finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
+    UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
     Vertex finalReduceVertex = new Vertex("finalreduce",
         new ProcessorDescriptor(
             ReduceProcessor.class.getName())

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
index f2b7043..aa7b836 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.mapreduce.examples.FilterLinesByWord;
 import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index 2a0a1e0..107d63c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.tez.client.MRTezClient;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.DataSourceDescriptor;
@@ -379,7 +380,7 @@ public class YARNRunner implements ClientProtocol {
           MRJobConfig.MAPRED_ADMIN_USER_ENV);
     }
 
-    MRHelpers.updateEnvironmentForMRTasks(jobConf, environment, isMap);
+    MRHelpers.updateEnvBasedOnMRTaskEnv(jobConf, environment, isMap);
   }
 
   private Vertex createVertexForStage(Configuration stageConf,
@@ -408,12 +409,12 @@ public class YARNRunner implements ClientProtocol {
       }
     }
 
-    Resource taskResource = isMap ? MRHelpers.getMapResource(stageConf)
-        : MRHelpers.getReduceResource(stageConf);
+    Resource taskResource = isMap ? MRHelpers.getResourceForMRMapper(stageConf)
+        : MRHelpers.getResourceForMRReducer(stageConf);
     
     stageConf.set(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX, "part");
     
-    UserPayload vertexUserPayload = MRHelpers.createUserPayloadFromConf(stageConf);
+    UserPayload vertexUserPayload = TezUtils.createUserPayloadFromConf(stageConf);
     Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName).setUserPayload(vertexUserPayload),
         numTasks, taskResource);
     if (isMap) {
@@ -437,8 +438,8 @@ public class YARNRunner implements ClientProtocol {
     // here
     taskLocalResources.putAll(jobLocalResources);
 
-    String taskJavaOpts = isMap ? MRHelpers.getMapJavaOpts(stageConf)
-        : MRHelpers.getReduceJavaOpts(stageConf);
+    String taskJavaOpts = isMap ? MRHelpers.getJavaOptsForMRMapper(stageConf)
+        : MRHelpers.getJavaOptsForMRReducer(stageConf);
 
     vertex.setTaskEnvironment(taskEnv)
         .setTaskLocalFiles(taskLocalResources)
@@ -561,7 +562,7 @@ public class YARNRunner implements ClientProtocol {
 
     // Transform all confs to use Tez keys
     for (int i = 0; i < stageConfs.length; i++) {
-      MRHelpers.translateVertexConfToTez(stageConfs[i]);
+      MRHelpers.translateMRConfToTez(stageConfs[i]);
     }
 
     // create inputs to tezClient.submit()
@@ -600,7 +601,7 @@ public class YARNRunner implements ClientProtocol {
     Map<String, String> environment = new HashMap<String, String>();
 
     // Setup the environment variables for AM
-    MRHelpers.updateEnvironmentForMRAM(conf, environment);
+    MRHelpers.updateEnvBasedOnMRAMEnv(conf, environment);
     StringBuilder envStrBuilder = new StringBuilder();
     boolean first = true;
     for (Entry<String, String> entry : environment.entrySet()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
index 021c82d..ba60074 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
@@ -33,11 +33,11 @@ import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.mapreduce.hadoop.MRConfig;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.runtime.api.OutputCommitter;
 import org.apache.tez.runtime.api.OutputCommitterContext;
@@ -65,7 +65,7 @@ public class MROutputCommitter extends OutputCommitter {
       jobConf = new JobConf();
     } else {
       jobConf = new JobConf(
-          MRHelpers.createConfFromUserPayload(userPayload));
+          TezUtils.createConfFromUserPayload(userPayload));
     }
     
     // Read all credentials into the credentials instance stored in JobConf.

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index c03d4bb..e418306 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -28,10 +28,9 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
@@ -72,7 +71,7 @@ public class MRInputAMSplitGenerator extends InputInitializer {
     if (LOG.isDebugEnabled()) {
       sw.reset().start();
     }
-    Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
+    Configuration conf = TezUtils.createConfFromByteString(userPayloadProto
         .getConfigurationBytes());
     
     sendSerializedEvents = conf.getBoolean(

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index 7f0ec4e..4555c43 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -25,7 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.lib.MRInputUtils;
@@ -67,7 +67,7 @@ public class MRInputSplitDistributor extends InputInitializer {
       LOG.debug("Time to parse MRInput payload into prot: "
           + sw.elapsedMillis());  
     }
-    Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
+    Configuration conf = TezUtils.createConfFromByteString(userPayloadProto
         .getConfigurationBytes());
     JobConf jobConf = new JobConf(conf);
     boolean useNewApi = jobConf.getUseNewMapper();

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index c100177..0ba6e38 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -20,57 +20,96 @@ package org.apache.tez.mapreduce.hadoop;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.util.List;
 import java.util.Map;
 import java.util.Vector;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.yarn.ContainerLogAppender;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.TezYARNUtils;
-import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.mapreduce.combine.MRCombiner;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 
-import com.google.protobuf.ByteString;
-
 
+/**
+ * This class contains helper methods for frameworks which migrate from MapReduce to Tez, and need
+ * to continue to work with existing MapReduce configurations.
+ */
 public class MRHelpers {
 
   private static final Log LOG = LogFactory.getLog(MRHelpers.class);
 
 
-
-
   /**
-   * Translates MR keys to Tez for the provided conf. The conversion is
-   * done in place.
+   * Translate MapReduce configuration keys to the equivalent Tez keys in the provided
+   * configuration. The translation is done in place. </p>
+   * This method is meant to be used by frameworks which rely upon existing MapReduce configuration
+   * instead of setting up their own.
    *
-   * @param conf
-   *          mr based configuration to be translated to tez
+   * @param conf mr based configuration to be translated to tez
    */
-  @LimitedPrivate("Hive, Pig")
-  @Unstable
-  public static void translateVertexConfToTez(Configuration conf) {
+  public static void translateMRConfToTez(Configuration conf) {
     convertVertexConfToTez(conf);
   }
 
+  /**
+   * Update the provided configuration to use the new API (mapreduce) or the old API (mapred) based
+   * on the configured InputFormat, OutputFormat, Partitioner etc. Also ensures that keys not
+   * required by a particular mode are not present. </p>
+   *
+   * This method should be invoked after completely setting up the configuration. </p>
+   *
+   * Defaults to using the new API if relevant keys are not present.
+   *
+   */
+  public static void configureMRApiUsage(Configuration conf) {
+    String oldMapperClass = "mapred.mapper.class";
+    conf.setBooleanIfUnset("mapred.mapper.new-api", conf.get(oldMapperClass) == null);
+    try {
+      if (conf.getBoolean("mapred.mapper.new-api", false)) {
+        String mode = "new map API";
+        ensureNotSet(conf, "mapred.input.format.class", mode);
+        ensureNotSet(conf, oldMapperClass, mode);
+      } else {
+        String mode = "map compatability";
+        ensureNotSet(conf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode);
+        ensureNotSet(conf, MRJobConfig.MAP_CLASS_ATTR, mode);
+      }
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+  }
+
   private static void convertVertexConfToTez(Configuration vertexConf) {
     setStageKeysFromBaseConf(vertexConf, vertexConf, "unknown");
     processDirectConversion(vertexConf);
+    setupMRComponents(vertexConf);
+  }
+
+  private static void setupMRComponents(Configuration conf) {
+    if (conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS) == null) {
+      conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
+          MRPartitioner.class.getName());
+    }
+
+    if (conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS) == null) {
+      boolean useNewApi = conf.getBoolean("mapred.mapper.new-api", false);
+      if (useNewApi) {
+        if (conf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null) {
+          conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName());
+        }
+      } else {
+        if (conf.get("mapred.combiner.class") != null) {
+          conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName());
+        }
+      }
+    }
   }
 
   /**
@@ -153,10 +192,17 @@ public class MRHelpers {
     }
   }
 
+  private static void ensureNotSet(Configuration conf, String attr, String msg)
+      throws IOException {
+    if (conf.get(attr) != null) {
+      throw new IOException(attr + " is incompatible with " + msg + " mode.");
+    }
+  }
+
   private static String getLog4jCmdLineProperties(Configuration conf,
       boolean isMap) {
     Vector<String> logProps = new Vector<String>(4);
-    addLog4jSystemProperties(getChildLogLevel(conf, isMap), logProps);
+    TezUtils.addLog4jSystemProperties(getChildLogLevel(conf, isMap), logProps);
     StringBuilder sb = new StringBuilder();
     for (String str : logProps) {
       sb.append(str).append(" ");
@@ -165,21 +211,34 @@ public class MRHelpers {
   }
 
   /**
-   * Add the JVM system properties necessary to configure
-   * {@link ContainerLogAppender}.
+   * Generate JVM options based on MapReduce AM java options. </p>
+   * <p/>
+   * This is only meant to be used if frameworks are not setting up their own java options or
+   * relying on the defaults specified by Tez, and instead want to use the options which may already
+   * have been configured for an MR AppMaster.
    *
-   * @param logLevel
-   *          the desired log level (eg INFO/WARN/DEBUG)
-   * @param vargs
-   *          the argument list to append to
+   * @param conf Configuration to be used to extract JVM opts specific info
+   * @return JAVA_OPTS string to be used in launching the JVM
    */
-  public static void addLog4jSystemProperties(String logLevel,
-      List<String> vargs) {
-    TezClientUtils.addLog4jSystemProperties(logLevel, vargs);
+  public static String getJavaOptsForMRAM(Configuration conf) {
+    // Admin opts
+    String mrAppMasterAdminOptions = conf.get(
+        MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
+        MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
+    // Add AM user command opts
+    String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
+        MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
+
+    return mrAppMasterAdminOptions.trim()
+        + " " + mrAppMasterUserOptions.trim();
   }
 
   /**
-   * Generate JVM options to be used to launch map tasks
+   * Generate JVM options based on MapReduce mapper java options. </p>
+   *
+   * This is only meant to be used if frameworks are not setting up their own java options,
+   * and would like to fallback to using java options which may already be configured for
+   * Hadoop MapReduce mappers.
    *
    * Uses mapreduce.admin.map.child.java.opts, mapreduce.map.java.opts and
    * mapreduce.map.log.level from config to generate the opts.
@@ -188,7 +247,7 @@ public class MRHelpers {
    * @return JAVA_OPTS string to be used in launching the JVM
    */
   @SuppressWarnings("deprecation")
-  public static String getMapJavaOpts(Configuration conf) {
+  public static String getJavaOptsForMRMapper(Configuration conf) {
     String adminOpts = conf.get(
         MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
         MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
@@ -204,7 +263,11 @@ public class MRHelpers {
   }
 
   /**
-   * Generate JVM options to be used to launch reduce tasks
+   * Generate JVM options based on MapReduce reducer java options. </p>
+   *
+   * This is only meant to be used if frameworks are not setting up their own java options,
+   * and would like to fallback to using java options which may already be configured for
+   * Hadoop MapReduce reducers.
    *
    * Uses mapreduce.admin.reduce.child.java.opts, mapreduce.reduce.java.opts
    * and mapreduce.reduce.log.level from config to generate the opts.
@@ -213,7 +276,7 @@ public class MRHelpers {
    * @return JAVA_OPTS string to be used in launching the JVM
    */
   @SuppressWarnings("deprecation")
-  public static String getReduceJavaOpts(Configuration conf) {
+  public static String getJavaOptsForMRReducer(Configuration conf) {
     String adminOpts = conf.get(
         MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
         MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
@@ -229,146 +292,12 @@ public class MRHelpers {
   }
 
   /**
-   * Sets up parameters which used to be set by the MR JobClient. Includes
-   * setting whether to use the new api or the old api. Note: Must be called
-   * before generating InputSplits
-   *
-   * @param conf
-   *          configuration for the vertex.
-   */
-  public static void doJobClientMagic(Configuration conf) throws IOException {
-    setUseNewAPI(conf);
-    // TODO Maybe add functionality to check output specifications - e.g. fail
-    // early if the output directory exists.
-    InetAddress ip = InetAddress.getLocalHost();
-    if (ip != null) {
-      String submitHostAddress = ip.getHostAddress();
-      String submitHostName = ip.getHostName();
-      conf.set(MRJobConfig.JOB_SUBMITHOST, submitHostName);
-      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR, submitHostAddress);
-    }
-    // conf.set("hadoop.http.filter.initializers",
-    // "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
-    // Skipping setting JOB_DIR - not used by AM.
-
-    // Maybe generate SHUFFLE secret. The AM uses the job token generated in
-    // the AM anyway.
-
-    // TODO eventually ACLs
-    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, MRPartitioner.class.getName());
-    
-    boolean useNewApi = conf.getBoolean("mapred.mapper.new-api", false);
-    if (useNewApi) {
-      if (conf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null) {
-        conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName());
-      }
-    } else {
-      if (conf.get("mapred.combiner.class") != null) {
-        conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName());
-      }
-    }
-    
-    setWorkingDirectory(conf);
-  }
-
-  private static void setWorkingDirectory(Configuration conf) {
-    String name = conf.get(JobContext.WORKING_DIR);
-    if (name == null) {
-      try {
-        Path dir = FileSystem.get(conf).getWorkingDirectory();
-        conf.set(JobContext.WORKING_DIR, dir.toString());
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  /**
-   * Default to the new APIs unless they are explicitly set or the old mapper or
-   * reduce attributes are used.
-   *
-   * @throws IOException
-   *           if the configuration is inconsistant
-   */
-  private static void setUseNewAPI(Configuration conf) throws IOException {
-    int numReduces = conf.getInt(MRJobConfig.NUM_REDUCES, 1);
-    String oldMapperClass = "mapred.mapper.class";
-    String oldReduceClass = "mapred.reducer.class";
-    conf.setBooleanIfUnset("mapred.mapper.new-api",
-        conf.get(oldMapperClass) == null);
-    if (conf.getBoolean("mapred.mapper.new-api", false)) {
-      String mode = "new map API";
-      ensureNotSet(conf, "mapred.input.format.class", mode);
-      ensureNotSet(conf, oldMapperClass, mode);
-      if (numReduces != 0) {
-        ensureNotSet(conf, "mapred.partitioner.class", mode);
-      } else {
-        ensureNotSet(conf, "mapred.output.format.class", mode);
-      }
-    } else {
-      String mode = "map compatability";
-      ensureNotSet(conf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode);
-      ensureNotSet(conf, MRJobConfig.MAP_CLASS_ATTR, mode);
-      if (numReduces != 0) {
-        ensureNotSet(conf, MRJobConfig.PARTITIONER_CLASS_ATTR, mode);
-      } else {
-        ensureNotSet(conf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
-      }
-    }
-    if (numReduces != 0) {
-      conf.setBooleanIfUnset("mapred.reducer.new-api",
-          conf.get(oldReduceClass) == null);
-      if (conf.getBoolean("mapred.reducer.new-api", false)) {
-        String mode = "new reduce API";
-        ensureNotSet(conf, "mapred.output.format.class", mode);
-        ensureNotSet(conf, oldReduceClass, mode);
-      } else {
-        String mode = "reduce compatability";
-        ensureNotSet(conf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
-        ensureNotSet(conf, MRJobConfig.REDUCE_CLASS_ATTR, mode);
-      }
-    }
-  }
-
-  private static void ensureNotSet(Configuration conf, String attr, String msg)
-      throws IOException {
-    if (conf.get(attr) != null) {
-      throw new IOException(attr + " is incompatible with " + msg + " mode.");
-    }
-  }
-
-  @LimitedPrivate("Hive, Pig")
-  @Unstable
-  public static UserPayload createUserPayloadFromConf(Configuration conf)
-      throws IOException {
-    return TezUtils.createUserPayloadFromConf(conf);
-  }
-  
-  @LimitedPrivate("Hive, Pig")
-  public static ByteString createByteStringFromConf(Configuration conf)
-      throws IOException {
-    return TezUtils.createByteStringFromConf(conf);
-  }
-
-  @LimitedPrivate("Hive, Pig")
-  @Unstable
-  public static Configuration createConfFromUserPayload(UserPayload payload)
-      throws IOException {
-    return TezUtils.createConfFromUserPayload(payload);
-  }
-
-  @LimitedPrivate("Hive, Pig")
-  public static Configuration createConfFromByteString(ByteString bs)
-      throws IOException {
-    return TezUtils.createConfFromByteString(bs);
-  }
-
-  /**
-   * Extract the map task's container resource requirements from the
-   * provided configuration.
-   *
-   * Uses mapreduce.map.memory.mb and mapreduce.map.cpu.vcores from the
-   * provided configuration.
+   * Extract the container resource requirements from the provided configuration, which would
+   * otherwise have been used when running a Hadoop MapReduce mapper. </p>
+   * <p/>
+   * This is only meant to be used if frameworks are not setting up their own {@link
+   * org.apache.hadoop.yarn.api.records.Resource} and would like to fallback to using resources
+   * which may already be configured for Hadoop MapReduce mappers.
    *
    * @param conf Configuration with MR specific settings used to extract
    * information from
@@ -376,7 +305,7 @@ public class MRHelpers {
    * @return Resource object used to define requirements for containers
    * running Map tasks
    */
-  public static Resource getMapResource(Configuration conf) {
+  public static Resource getResourceForMRMapper(Configuration conf) {
     return Resource.newInstance(conf.getInt(
         MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB),
         conf.getInt(MRJobConfig.MAP_CPU_VCORES,
@@ -384,33 +313,38 @@ public class MRHelpers {
   }
 
   /**
-   * Extract the reduce task's container resource requirements from the
-   * provided configuration.
-   *
+   * Extract the container resource requirements from the provided configuration, which would
+   * otherwise have been used when running a Hadoop MapReduce reducer. </p>
+   * <p/>
+   * This is only meant to be used if frameworks are not setting up their own {@link
+   * org.apache.hadoop.yarn.api.records.Resource} and would like to fallback to using resources
+   * which may already be configured for Hadoop MapReduce reducers.
+   * <p/>
    * Uses mapreduce.reduce.memory.mb and mapreduce.reduce.cpu.vcores from the
    * provided configuration.
    *
    * @param conf Configuration with MR specific settings used to extract
-   * information from
-   *
+   *             information from
    * @return Resource object used to define requirements for containers
    * running Reduce tasks
    */
-  public static Resource getReduceResource(Configuration conf) {
+  public static Resource getResourceForMRReducer(Configuration conf) {
     return Resource.newInstance(conf.getInt(
-        MRJobConfig.REDUCE_MEMORY_MB, MRJobConfig.DEFAULT_REDUCE_MEMORY_MB),
+            MRJobConfig.REDUCE_MEMORY_MB, MRJobConfig.DEFAULT_REDUCE_MEMORY_MB),
         conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
             MRJobConfig.DEFAULT_REDUCE_CPU_VCORES));
   }
 
   /**
-   * Setup classpath and other environment variables
-   * @param conf Configuration to retrieve settings from
+   * Setup classpath and other environment variables based on the configured values for MR Mappers
+   * or Reducers
+   *
+   * @param conf        Configuration to retrieve settings from
    * @param environment Environment to update
-   * @param isMap Whether task is a map or reduce task
+   * @param isMap       Whether task is a map or reduce task
    */
-  public static void updateEnvironmentForMRTasks(Configuration conf,
-      Map<String, String> environment, boolean isMap) {
+  public static void updateEnvBasedOnMRTaskEnv(Configuration conf,
+                                               Map<String, String> environment, boolean isMap) {
     // Shell
     environment.put(Environment.SHELL.name(), conf.get(
         MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL));
@@ -437,58 +371,16 @@ public class MRHelpers {
         getChildLogLevel(conf, isMap) + ",CLA");
   }
 
-  private static Configuration getBaseJobConf(Configuration conf) {
-    if (conf != null) {
-      return new JobConf(conf);
-    } else {
-      return new JobConf();
-    }
-  }
-
-  /**
-   * Get default initialize JobConf-based configuration
-   * @param conf Conf to initialize JobConf with.
-   * @return Base configuration for MR-based jobs
-   */
-  public static Configuration getBaseMRConfiguration(Configuration conf) {
-    return getBaseJobConf(conf);
-  }
-
   /**
-   * Get default initialize JobConf-based configuration
-   * @return Base configuration for MR-based jobs
-   */
-  static Configuration getBaseMRConfiguration() {
-    return getBaseJobConf(null);
-  }
-
-  /**
-   * Setup environment for the AM based on MR-based configuration
+   * Setup environment variables based on the configured values for the MR AM
    * @param conf Configuration from which to extract information
    * @param environment Environment map to update
    */
-  public static void updateEnvironmentForMRAM(Configuration conf, Map<String, String> environment) {
+  public static void updateEnvBasedOnMRAMEnv(Configuration conf, Map<String, String> environment) {
     TezYARNUtils.appendToEnvFromInputString(environment, conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV),
         File.pathSeparator);
     TezYARNUtils.appendToEnvFromInputString(environment, conf.get(MRJobConfig.MR_AM_ENV),
         File.pathSeparator);
   }
 
-  /**
-   * Extract Java Opts for the AM based on MR-based configuration
-   * @param conf Configuration from which to extract information
-   * @return Java opts for the AM
-   */
-  public static String getMRAMJavaOpts(Configuration conf) {
-    // Admin opts
-    String mrAppMasterAdminOptions = conf.get(
-        MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
-        MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
-    // Add AM user command opts
-    String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
-        MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
-
-    return mrAppMasterAdminOptions.trim()
-        + " " + mrAppMasterUserOptions.trim();
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 6b8ed83..0143df9 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -206,8 +206,8 @@ public class MRInput extends MRInputBase {
       } catch (Exception e) {
         throw new TezUncheckedException(e);
       }
-      MRHelpers.translateVertexConfToTez(inputConf);
-      MRHelpers.doJobClientMagic(inputConf);
+      MRHelpers.translateMRConfToTez(inputConf);
+      MRHelpers.configureMRApiUsage(inputConf);
       UserPayload payload = MRInputHelpersInternal.createMRInputPayload(inputConf, inputSplitInfo.getSplitsProto());
       Credentials credentials = null;
       if (getCredentialsForSourceFilesystem && inputSplitInfo.getCredentials() != null) {
@@ -224,8 +224,8 @@ public class MRInput extends MRInputBase {
       Configuration inputConf = new JobConf(conf);
       setupBasicConf(inputConf);
 
-      MRHelpers.translateVertexConfToTez(inputConf);
-      MRHelpers.doJobClientMagic(inputConf);
+      MRHelpers.translateMRConfToTez(inputConf);
+      MRHelpers.configureMRApiUsage(inputConf);
 
       Credentials credentials = maybeGetCredentials();
 
@@ -243,8 +243,8 @@ public class MRInput extends MRInputBase {
     private DataSourceDescriptor createGeneratorDataSource() throws IOException {
       Configuration inputConf = new JobConf(conf);
       setupBasicConf(inputConf);
-      MRHelpers.translateVertexConfToTez(inputConf);
-      MRHelpers.doJobClientMagic(inputConf);
+      MRHelpers.translateMRConfToTez(inputConf);
+      MRHelpers.configureMRApiUsage(inputConf);
       
       Credentials credentials = maybeGetCredentials();