You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/08/05 22:05:40 UTC

[1/2] TEZ-1317. Simplify MRinput/MROutput configuration (bikas)

Repository: tez
Updated Branches:
  refs/heads/master 5ae48c6ef -> 1831fda98


http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
index d999780..d6e2989 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -27,8 +27,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.RecordReader;
+import org.apache.tez.mapreduce.input.MRInput.MRInputConfigurer;
 import org.apache.tez.runtime.api.TezInputContext;
 import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 
@@ -42,6 +44,30 @@ public class MRInputLegacy extends MRInput {
   private ReentrantLock eventLock = new ReentrantLock();
   private Condition eventCondition = eventLock.newCondition();
 
+  
+  /**
+   * Create an {@link MRInputConfigurer}
+   * @param conf Configuration for the {@link MRInputLegacy}
+   * @param inputFormat InputFormat derived class
+   * @return {@link MRInputConfigurer}
+   */
+  public static MRInputConfigurer createtConfigurer(Configuration conf, Class<?> inputFormat) {
+    return MRInput.createConfigurer(conf, inputFormat).setInputClassName(MRInputLegacy.class.getName());
+  }
+
+  /**
+   * Create an {@link MRInputConfigurer} for a FileInputFormat
+   * @param conf Configuration for the {@link MRInputLegacy}
+   * @param inputFormat FileInputFormat derived class
+   * @param inputPaths Comma separated input paths
+   * @return {@link MRInputConfigurer}
+   */
+  public static MRInputConfigurer createConfigurer(Configuration conf, Class<?> inputFormat,
+      String inputPaths) {
+    return MRInput.createConfigurer(conf, inputFormat, inputPaths).setInputClassName(
+        MRInputLegacy.class.getName());
+  }
+  
   public MRInputLegacy(TezInputContext inputContext, int numPhysicalInputs) {
     super(inputContext, numPhysicalInputs);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index bda3f1b..709a2c4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.output;
 
 import java.io.IOException;
 import java.text.NumberFormat;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -35,11 +36,18 @@ import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.mapreduce.committer.MROutputCommitter;
 import org.apache.tez.mapreduce.hadoop.MRConfig;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -51,7 +59,135 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
+
 public class MROutput extends AbstractLogicalOutput {
+  
+  /**
+   * Helper class to configure {@link MROutput}
+   *
+   */
+  public static class MROutputConfigurer {
+    final Configuration conf;
+    final Class<?> outputFormat;
+    boolean useNewApi;
+    boolean getCredentialsForSinkFilesystem = true;
+    String outputPath;
+    
+    private MROutputConfigurer(Configuration conf, Class<?> outputFormat) {
+      this.conf = conf;
+      this.outputFormat = outputFormat;
+      if (org.apache.hadoop.mapred.OutputFormat.class.isAssignableFrom(outputFormat)) {
+        useNewApi = false;
+      } else if(org.apache.hadoop.mapreduce.OutputFormat.class.isAssignableFrom(outputFormat)) {
+        useNewApi = true;
+      } else {
+        throw new TezUncheckedException("outputFormat must be assignable from either " +
+            "org.apache.hadoop.mapred.OutputFormat or " +
+            "org.apache.hadoop.mapreduce.OutputFormat" +
+            " Given: " + outputFormat.getName());
+      }
+    }
+
+    private MROutputConfigurer setOutputPath(String outputPath) {
+      if (!(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class.isAssignableFrom(outputFormat) || 
+          FileOutputFormat.class.isAssignableFrom(outputFormat))) {
+        throw new TezUncheckedException("When setting outputPath the outputFormat must " + 
+            "be assignable from either org.apache.hadoop.mapred.FileOutputFormat or " +
+            "org.apache.hadoop.mapreduce.lib.output.FileOutputFormat. " +
+            "Otherwise use the non-path configurer." + 
+            " Given: " + outputFormat.getName());
+      }
+      conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, outputPath);
+      this.outputPath = outputPath;
+      return this;
+    }
+    
+    /**
+     * Create the {@link DataSinkDescriptor}
+     * @return {@link DataSinkDescriptor}
+     */
+    public DataSinkDescriptor create() {
+      Credentials credentials = null;
+      if (getCredentialsForSinkFilesystem && outputPath != null) {
+        try {
+          Path path = new Path(outputPath);
+          FileSystem fs;
+          fs = path.getFileSystem(conf);
+          Path qPath = fs.makeQualified(path);
+          credentials = new Credentials();
+          TezClientUtils.addFileSystemCredentialsFromURIs(Collections.singletonList(qPath.toUri()),
+              credentials, conf);
+        } catch (IOException e) {
+          throw new TezUncheckedException(e);
+        }
+      }
+      
+      return new DataSinkDescriptor(
+          new OutputDescriptor(MROutput.class.getName()).setUserPayload(createUserPayload(conf,
+              outputFormat.getName(), useNewApi)), new OutputCommitterDescriptor(
+              MROutputCommitter.class.getName()), credentials);
+    }
+    
+    /**
+     * Get the credentials for the output from its {@link FileSystem}s
+     * Use the method to turn this off when not using a {@link FileSystem}
+     * or when {@link Credentials} are not supported
+     * @param value whether to get credentials or not. (true by default)
+     * @return {@link MROutputConfigurer}
+     */
+    public MROutputConfigurer getCredentialsForSinkFileSystem(boolean value) {
+      getCredentialsForSinkFilesystem = value;
+      return this;
+    }
+
+    /**
+     * Creates the user payload to be set on the OutputDescriptor for MROutput
+     * @param conf Configuration for the OutputFormat
+     * @param outputFormatName Name of the class of the OutputFormat
+     * @param useNewApi Use new mapreduce API or old mapred API
+     * @return
+     * @throws IOException
+     */
+    private byte[] createUserPayload(Configuration conf, 
+        String outputFormatName, boolean useNewApi) {
+      Configuration outputConf = new JobConf(conf);
+      outputConf.setBoolean("mapred.reducer.new-api", useNewApi);
+      if (useNewApi) {
+        outputConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, outputFormatName);
+      } else {
+        outputConf.set("mapred.output.format.class", outputFormatName);
+      }
+      MRHelpers.translateVertexConfToTez(outputConf);
+      try {
+        MRHelpers.doJobClientMagic(outputConf);
+        return TezUtils.createUserPayloadFromConf(outputConf);
+      } catch (IOException e) {
+        throw new TezUncheckedException(e);
+      }
+    }
+  }
+  
+  /**
+   * Create an {@link MROutputConfigurer}
+   * @param conf Configuration for the {@link MROutput}
+   * @param outputFormat OutputFormat derived class
+   * @return {@link MROutputConfigurer}
+   */
+  public static MROutputConfigurer createtConfigurer(Configuration conf, Class<?> outputFormat) {
+    return new MROutputConfigurer(conf, outputFormat);
+  }
+
+  /**
+   * Create an {@link MROutputConfigurer} for a FileOutputFormat
+   * @param conf Configuration for the {@link MROutput}
+   * @param outputFormat FileInputFormat derived class
+   * @param outputPath Output path
+   * @return {@link MROutputConfigurer}
+   */
+  public static MROutputConfigurer createConfigurer(Configuration conf, Class<?> outputFormat,
+      String outputPath) {
+    return new MROutputConfigurer(conf, outputFormat).setOutputPath(outputPath);
+  }
 
   private static final Log LOG = LogFactory.getLog(MROutput.class);
 
@@ -85,24 +221,6 @@ public class MROutput extends AbstractLogicalOutput {
     super(outputContext, numPhysicalOutputs);
   }
 
-  /**
-   * Creates the user payload to be set on the OutputDescriptor for MROutput
-   * @param conf Configuration for the OutputFormat
-   * @param outputFormatName Name of the class of the OutputFormat
-   * @param useNewApi Use new mapreduce API or old mapred API
-   * @return
-   * @throws IOException
-   */
-  public static byte[] createUserPayload(Configuration conf, 
-      String outputFormatName, boolean useNewApi) throws IOException {
-    Configuration outputConf = new JobConf(conf);
-    outputConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, outputFormatName);
-    outputConf.setBoolean("mapred.mapper.new-api", useNewApi);
-    MRHelpers.translateVertexConfToTez(outputConf);
-    MRHelpers.doJobClientMagic(outputConf);
-    return TezUtils.createUserPayloadFromConf(outputConf);
-  }
-
   @Override
   public List<Event> initialize() throws IOException, InterruptedException {
     LOG.info("Initializing Simple Output");

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseConf.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseConf.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseConf.java
deleted file mode 100644
index cf65abd..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseConf.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.conf;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.tez.dag.api.EdgeProperty;
-
-@InterfaceAudience.Private
-abstract class HadoopKeyValuesBasedBaseConf {
-
-  /**
-   * Get the payload for the configured Output
-   * @return output configuration as a byte array
-   */
-  public abstract byte[] getOutputPayload();
-
-  /**
-   * Get the output class name
-   * @return the output class name
-   */
-  public abstract String getOutputClassName();
-
-  /**
-   * Get the payload for the configured Input
-   * @return input configuration as a byte array
-   */
-  public abstract byte[] getInputPayload();
-
-  /**
-   * Get the input class name
-   * @return the input class name
-   */
-  public abstract String getInputClassName();
-
-  public abstract static class Builder<T extends Builder<T>> implements BaseConfigurer<T> {
-
-    /**
-     * Enable compression for the specific Input / Output / Edge
-     *
-     * @param compressionCodec the codec to be used. null implies using the default
-     * @return instance of the current builder
-     */
-    public T enableCompression(String compressionCodec) {
-      return (T) this;
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
new file mode 100644
index 0000000..35d023f
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+abstract class HadoopKeyValuesBasedBaseEdgeConfigurer {
+
+  /**
+   * Get the payload for the configured Output
+   * @return output configuration as a byte array
+   */
+  public abstract byte[] getOutputPayload();
+
+  /**
+   * Get the output class name
+   * @return the output class name
+   */
+  public abstract String getOutputClassName();
+
+  /**
+   * Get the payload for the configured Input
+   * @return input configuration as a byte array
+   */
+  public abstract byte[] getInputPayload();
+
+  /**
+   * Get the input class name
+   * @return the input class name
+   */
+  public abstract String getInputClassName();
+
+  public abstract static class Builder<T extends Builder<T>> implements BaseConfigurer<T> {
+
+    /**
+     * Enable compression for the specific Input / Output / Edge
+     *
+     * @param compressionCodec the codec to be used. null implies using the default
+     * @return instance of the current builder
+     */
+    public T enableCompression(String compressionCodec) {
+      return (T) this;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java
index 97712a3..fed20ab 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java
@@ -75,7 +75,7 @@ public class OnFileSortedOutputConfiguration {
   @SuppressWarnings("rawtypes")
   @InterfaceAudience.Public
   @InterfaceStability.Evolving
-  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseConf.Builder> implements
+  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
       SpecificConfigurer<SpecificBuilder> {
 
     private final E edgeBuilder;

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java
index 17bb3b8..e8e8be1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java
@@ -48,7 +48,7 @@ public class OnFileUnorderedKVOutputConfiguration {
 
   @InterfaceAudience.Public
   @InterfaceStability.Evolving
-  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseConf.Builder> implements
+  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
       SpecificConfigurer<SpecificBuilder> {
 
     private final E edgeBuilder;

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java
index 2967501..95a3a3c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java
@@ -56,7 +56,7 @@ public class OnFileUnorderedPartitionedKVOutputConfiguration {
   @SuppressWarnings("rawtypes")
   @InterfaceAudience.Public
   @InterfaceStability.Evolving
-  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseConf.Builder> implements
+  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
       SpecificConfigurer<SpecificBuilder> {
 
     private final E edgeBuilder;

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
index 4da5060..f54f183 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
@@ -36,7 +36,7 @@ import org.apache.tez.runtime.library.output.OnFileSortedOutput;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseConf {
+public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfigurer {
 
   private final OnFileSortedOutputConfiguration outputConf;
   private final ShuffledMergedInputConfiguration inputConf;
@@ -118,7 +118,7 @@ public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBase
 
   @InterfaceAudience.Public
   @InterfaceStability.Evolving
-  public static class Builder extends HadoopKeyValuesBasedBaseConf.Builder<Builder> {
+  public static class Builder extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder<Builder> {
 
     private final OnFileSortedOutputConfiguration.Builder outputBuilder =
         new OnFileSortedOutputConfiguration.Builder();

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java
index d6abf03..dbdd2dc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java
@@ -119,7 +119,7 @@ public class ShuffledMergedInputConfiguration {
   @SuppressWarnings("rawtypes")
   @InterfaceAudience.Public
   @InterfaceStability.Evolving
-  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseConf.Builder> implements
+  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
       SpecificConfigurer<SpecificBuilder> {
 
     private final E edgeBuilder;

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java
index 7ff8a58..f7501dd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java
@@ -88,7 +88,7 @@ public class ShuffledUnorderedKVInputConfiguration {
   @SuppressWarnings("rawtypes")
   @InterfaceAudience.Public
   @InterfaceStability.Evolving
-  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseConf.Builder> implements
+  public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
       SpecificConfigurer<SpecificBuilder> {
 
     private final E edgeBuilder;

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
index 3f985f6..d07c68a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
@@ -38,7 +38,7 @@ import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseConf {
+public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfigurer {
 
   private final OnFileUnorderedPartitionedKVOutputConfiguration outputConf;
   private final ShuffledUnorderedKVInputConfiguration inputConf;
@@ -123,7 +123,7 @@ public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBa
 
   @InterfaceAudience.Public
   @InterfaceStability.Evolving
-  public static class Builder extends HadoopKeyValuesBasedBaseConf.Builder<Builder> {
+  public static class Builder extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder<Builder> {
 
     private final OnFileUnorderedPartitionedKVOutputConfiguration.Builder outputBuilder =
         new OnFileUnorderedPartitionedKVOutputConfiguration.Builder();

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
index 8adfad9..6ebb259 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
@@ -39,7 +39,7 @@ import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class UnorderedUnpartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseConf {
+public class UnorderedUnpartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfigurer {
   private final OnFileUnorderedKVOutputConfiguration outputConf;
   private final ShuffledUnorderedKVInputConfiguration inputConf;
 
@@ -140,7 +140,7 @@ public class UnorderedUnpartitionedKVEdgeConfigurer extends HadoopKeyValuesBased
 
   @InterfaceAudience.Public
   @InterfaceStability.Evolving
-  public static class Builder extends HadoopKeyValuesBasedBaseConf.Builder<Builder> {
+  public static class Builder extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder<Builder> {
 
     private final OnFileUnorderedKVOutputConfiguration.Builder outputBuilder =
         new OnFileUnorderedKVOutputConfiguration.Builder();

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index 0a3c06b..ae43022 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -181,9 +182,12 @@ public class TestDAGRecovery {
   @Test(timeout=120000)
   public void testDelayedInit() throws Exception {
     DAG dag = SimpleVTestDAG.createDAG("DelayedInitDAG", null);
-    dag.getVertex("v1").addDataSource("i1",
-        new InputDescriptor(NoOpInput.class.getName()),
-        new InputInitializerDescriptor(FailingInputInitializer.class.getName()));
+    dag.getVertex("v1").addDataSource(
+        "i1",
+        new DataSourceDescriptor(
+            new InputDescriptor(NoOpInput.class.getName()),
+            new InputInitializerDescriptor(FailingInputInitializer.class
+                .getName()), null));
     runDAGAndVerify(dag, State.SUCCEEDED);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
index ca8f00b..003f615 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -180,7 +181,7 @@ public class TestDAGRecovery2 {
             .toUserPayload());
     OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(
         MultiAttemptDAG.FailingOutputCommitter.class.getName());
-    dag.getVertex("v3").addDataSink("FailingOutput", od, ocd);
+    dag.getVertex("v3").addDataSink("FailingOutput", new DataSinkDescriptor(od, ocd, null));
     runDAGAndVerify(dag, State.FAILED);
   }
 


[2/2] git commit: TEZ-1317. Simplify MRinput/MROutput configuration (bikas)

Posted by bi...@apache.org.
TEZ-1317. Simplify MRinput/MROutput configuration (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1831fda9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1831fda9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1831fda9

Branch: refs/heads/master
Commit: 1831fda98497101566665d6157ceec02ff2166a4
Parents: 5ae48c6
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Aug 5 13:05:32 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Aug 5 13:05:32 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/tez/client/TezClientUtils.java   |  36 ++-
 .../main/java/org/apache/tez/dag/api/DAG.java   |  29 +-
 .../apache/tez/dag/api/DataSinkDescriptor.java  |  72 +++++
 .../tez/dag/api/DataSourceDescriptor.java       | 127 +++++++++
 .../java/org/apache/tez/dag/api/Vertex.java     |  64 +++--
 .../org/apache/tez/dag/api/VertexGroup.java     |   7 +-
 .../org/apache/tez/dag/api/TestDAGVerify.java   |  18 +-
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |   5 +-
 .../tez/dag/history/utils/TestDAGUtils.java     |  10 +-
 .../mapreduce/examples/FilterLinesByWord.java   |  15 +-
 .../examples/FilterLinesByWordOneToOne.java     |  21 +-
 .../mapreduce/examples/IntersectDataGen.java    |  46 +---
 .../mapreduce/examples/IntersectExample.java    |  67 +----
 .../mapreduce/examples/IntersectValidate.java   |  46 +---
 .../tez/mapreduce/examples/MRRSleepJob.java     |  56 ++--
 .../tez/mapreduce/examples/UnionExample.java    |  60 ++---
 .../tez/mapreduce/examples/WordCount.java       |  33 +--
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  |   7 +-
 .../org/apache/tez/mapreduce/input/MRInput.java | 266 +++++++++++++++----
 .../tez/mapreduce/input/MRInputLegacy.java      |  26 ++
 .../apache/tez/mapreduce/output/MROutput.java   | 154 +++++++++--
 .../conf/HadoopKeyValuesBasedBaseConf.java      |  65 -----
 .../HadoopKeyValuesBasedBaseEdgeConfigurer.java |  64 +++++
 .../conf/OnFileSortedOutputConfiguration.java   |   2 +-
 .../OnFileUnorderedKVOutputConfiguration.java   |   2 +-
 ...orderedPartitionedKVOutputConfiguration.java |   2 +-
 .../OrderedPartitionedKVEdgeConfigurer.java     |   4 +-
 .../conf/ShuffledMergedInputConfiguration.java  |   2 +-
 .../ShuffledUnorderedKVInputConfiguration.java  |   2 +-
 .../UnorderedPartitionedKVEdgeConfigurer.java   |   4 +-
 .../UnorderedUnpartitionedKVEdgeConfigurer.java |   4 +-
 .../org/apache/tez/test/TestDAGRecovery.java    |  10 +-
 .../org/apache/tez/test/TestDAGRecovery2.java   |   3 +-
 34 files changed, 877 insertions(+), 453 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 072fe55..5e4f2d8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -45,6 +45,7 @@ INCOMPATIBLE CHANGES
   Processor/Input/Output classes
   TEZ-1351. MROutput needs a flush method to ensure data is materialized for
   FileOutputCommitter
+  TEZ-1317. Simplify MRinput/MROutput configuration
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index dda30c2..a82393f 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -271,6 +271,29 @@ public class TezClientUtils {
     }
     return fs;
   }
+  
+  /**
+   * Populate {@link Credentials} for the URI's to access them from their {@link FileSystem}s
+   * @param uris URIs that need to be accessed
+   * @param credentials Credentials object into which to add the credentials
+   * @param conf Configuration to access the FileSystem
+   * @throws IOException
+   */
+  public static void addFileSystemCredentialsFromURIs(Collection<URI> uris, Credentials credentials,
+      Configuration conf) throws IOException {
+    // Obtain Credentials for any paths that the user may have configured.
+    if (uris != null && !uris.isEmpty()) {
+      Iterator<Path> pathIter = Iterators.transform(uris.iterator(), new Function<URI, Path>() {
+        @Override
+        public Path apply(URI input) {
+          return new Path(input);
+        }
+      });
+
+      Path[] paths = Iterators.toArray(pathIter, Path.class);
+      TokenCache.obtainTokensForFileSystems(credentials, paths, conf);
+    }
+  }
 
   /**
    * Obtains tokens for the DAG based on the list of URIs setup in the DAG. The
@@ -303,18 +326,7 @@ public class TezClientUtils {
     // Add additional credentials based on any URIs that the user may have specified.
     
     // Obtain Credentials for any paths that the user may have configured.
-    Collection<URI> uris = dag.getURIsForCredentials();
-    if (uris != null && !uris.isEmpty()) {
-      Iterator<Path> pathIter = Iterators.transform(uris.iterator(), new Function<URI, Path>() {
-        @Override
-        public Path apply(URI input) {
-          return new Path(input);
-        }
-      });
-
-      Path[] paths = Iterators.toArray(pathIter, Path.class);
-      TokenCache.obtainTokensForFileSystems(dagCredentials, paths, conf);
-    }
+    addFileSystemCredentialsFromURIs(dag.getURIsForCredentials(), dagCredentials, conf);
 
     // Obtain Credentials for the local resources configured on the DAG
     try {

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 15d23e3..e754990 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -72,7 +72,7 @@ public class DAG {
   final Set<Edge> edges = Sets.newHashSet();
   final String name;
   final Collection<URI> urisForCredentials = new HashSet<URI>();
-  Credentials credentials;
+  Credentials credentials = new Credentials();
   Set<VertexGroup> vertexGroups = Sets.newHashSet();
   Set<GroupInputEdge> groupInputEdges = Sets.newHashSet();
 
@@ -134,8 +134,9 @@ public class DAG {
    * need to be obtained so that the job can run. An incremental list of URIs
    * can be provided by making multiple calls to the method.
    * 
-   * Currently, credentials can only be fetched for HDFS and other
-   * {@link org.apache.hadoop.fs.FileSystem} implementations.
+   * Currently, @{link credentials} can only be fetched for HDFS and other
+   * {@link org.apache.hadoop.fs.FileSystem} implementations that support
+   * credentials.
    * 
    * @param uris
    *          a list of {@link URI}s
@@ -530,6 +531,28 @@ public class DAG {
     }
 
     for (Vertex vertex : vertices.values()) {
+      // infer credentials and parallelism from data source
+      List<DataSourceDescriptor> dataSources = vertex.getDataSources();
+      for (DataSourceDescriptor dataSource : dataSources) {
+        if (dataSource.getCredentials() != null) {
+          credentials.addAll(dataSource.getCredentials());
+        }
+      }
+      if (dataSources.size() == 1) {
+        DataSourceDescriptor dataSource = dataSources.get(0);
+        if (vertex.getParallelism() == -1 && dataSource.getNumberOfShards() > -1) {
+          vertex.setParallelism(dataSource.getNumberOfShards());
+        }
+        if (vertex.getLocationHint() == null && dataSource.getLocationHint() != null) {
+          vertex.setLocationHint(dataSource.getLocationHint());
+        }
+      }
+      for (DataSinkDescriptor dataSink : vertex.getDataSinks()) {
+        if (dataSink.getCredentials() != null) {
+          credentials.addAll(dataSink.getCredentials());
+        }
+      }
+      
       VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
       vertexBuilder.setName(vertex.getName());
       vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until  TEZ-46.

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
new file mode 100644
index 0000000..b5e4e8f
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.security.Credentials;
+
+/**
+ * Defines the output and output committer for a data sink 
+ *
+ */
+public class DataSinkDescriptor {
+  private final OutputDescriptor outputDescriptor;
+  private final OutputCommitterDescriptor committerDescriptor;
+  
+  private final Credentials credentials;
+
+  /**
+   * Create a {@link DataSinkDescriptor}
+   * @param outputDescriptor
+   *          An {@link OutputDescriptor} for the output
+   * @param outputCommitterDescriptor
+   *          Specify a committer to be used for the output. Can be null. After all
+   *          tasks in the vertex (or in the DAG) have completed, the committer
+   *          (if specified) is invoked to commit the outputs. Commit is a data
+   *          sink specific operation that usually determines the visibility of
+   *          the output to external observers. E.g. moving output files from
+   *          temporary dirs to the real output dir. When there are multiple
+   *          executions of a task, the commit process also helps decide which
+   *          execution will be included in the final output. Users should
+   *          consider whether their application or data sink need a commit
+   *          operation.
+   * @param credentials Credentials needs to access the data sink
+   */
+  public DataSinkDescriptor(OutputDescriptor outputDescriptor,
+      @Nullable OutputCommitterDescriptor committerDescriptor, 
+      @Nullable Credentials credentials) {
+    this.outputDescriptor = outputDescriptor;
+    this.committerDescriptor = committerDescriptor;
+    this.credentials = credentials;
+  }
+
+  public OutputDescriptor getOutputDescriptor() {
+    return outputDescriptor;
+  }
+  
+  public @Nullable OutputCommitterDescriptor getOutputCommitterDescriptor() {
+    return committerDescriptor;
+  }
+  
+  public @Nullable Credentials getCredentials() {
+    return credentials;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
new file mode 100644
index 0000000..51d85ae
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+
+/**
+ * Defines the input and input initializer for a data source 
+ *
+ */
+public class DataSourceDescriptor {
+  private final InputDescriptor inputDescriptor;
+  private final InputInitializerDescriptor initializerDescriptor;
+  
+  private final Credentials credentials;
+  private final int numShards;
+  private final VertexLocationHint locationHint;
+
+  /**
+   * Create a {@link DataSourceDescriptor} when the data shard calculation 
+   * happens in the App Master at runtime
+   * @param inputDescriptor
+   *          An {@link InputDescriptor} for the Input
+   * @param credentials Credentials needed to access the data
+   * @param inputInitializerDescriptor
+   *          An initializer for this Input which may run within the AM. This
+   *          can be used to set the parallelism for this vertex and generate
+   *          {@link RootInputDataInformationEvent}s for the actual Input.</p>
+   *          If this is not specified, the parallelism must be set for the
+   *          vertex. In addition, the Input should know how to access data for
+   *          each of it's tasks. </p> If a {@link TezRootInputInitializer} is
+   *          meant to determine the parallelism of the vertex, the initial
+   *          vertex parallelism should be set to -1. Can be null.
+   */
+  public DataSourceDescriptor(InputDescriptor inputDescriptor,
+      @Nullable InputInitializerDescriptor initializerDescriptor, 
+      @Nullable Credentials credentials) {
+    this(inputDescriptor, initializerDescriptor, -1, credentials, null);
+  }
+  
+  /**
+   * Create a {@link DataSourceDescriptor} when the data shard calculation
+   * happens in the client at compile time
+   * @param inputDescriptor
+   *          An {@link InputDescriptor} for the Input
+   * @param inputInitializerDescriptor
+   *          An initializer for this Input which may run within the AM. This
+   *          can be used to set the parallelism for this vertex and generate
+   *          {@link RootInputDataInformationEvent}s for the actual Input.</p>
+   *          If this is not specified, the parallelism must be set for the
+   *          vertex. In addition, the Input should know how to access data for
+   *          each of it's tasks. </p> If a {@link TezRootInputInitializer} is
+   *          meant to determine the parallelism of the vertex, the initial
+   *          vertex parallelism should be set to -1. Can be null.
+   * @param numShards Number of shards of data
+   * @param credentials Credentials needed to access the data
+   * @param locationHint Location hints for the vertex tasks
+   */
+  public DataSourceDescriptor(InputDescriptor inputDescriptor,
+      @Nullable InputInitializerDescriptor initializerDescriptor, int numShards,
+      @Nullable Credentials credentials, @Nullable VertexLocationHint locationHint) {
+    this.inputDescriptor = inputDescriptor;
+    this.initializerDescriptor = initializerDescriptor;
+    this.numShards = numShards;
+    this.credentials = credentials;
+    this.locationHint = locationHint;
+  }
+
+  public InputDescriptor getInputDescriptor() {
+    return inputDescriptor;
+  }
+  
+  public @Nullable InputInitializerDescriptor getInputInitializerDescriptor() {
+    return initializerDescriptor;
+  }
+  
+  /**
+   * Number of shards for this data source. If a vertex has only one
+   * data source this the number of tasks in the vertex should be set to 
+   * the number of shards
+   * Returns -1 when this is determined at runtime in the AM.
+   * @return number of tasks
+   */
+  public int getNumberOfShards() {
+    return numShards;
+  }
+  
+  /**
+   * Returns any credentials needed to access this data source.
+   * Is null when this calculation happens on the AppMaster (default)
+   * @return credentials.
+   */
+  public @Nullable Credentials getCredentials() {
+    return credentials;
+  }
+  
+  /**
+   * Get the location hints for the tasks in the vertex for this data source.
+   * Is null when shard calculation happens on the AppMaster (default)
+   * @return List of {@link TaskLocationHint}
+   */
+  public @Nullable VertexLocationHint getLocationHint() {
+    return locationHint;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 25a3990..5f67ad1 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -24,17 +24,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import javax.annotation.Nullable;
-
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.api.VertexGroup.GroupInfo;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.runtime.api.LogicalIOProcessor;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
-import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 public class Vertex {
@@ -58,6 +55,8 @@ public class Vertex {
   private final List<Edge> inputEdges = new ArrayList<Edge>();
   private final List<Edge> outputEdges = new ArrayList<Edge>();
   private final Map<String, GroupInfo> groupInputs = Maps.newHashMap();
+  private final List<DataSourceDescriptor> dataSources = Lists.newLinkedList();
+  private final List<DataSinkDescriptor> dataSinks = Lists.newLinkedList();
   
   private String taskLaunchCmdOpts = "";
 
@@ -232,24 +231,16 @@ public class Vertex {
    * @param inputName
    *          the name of the input. This will be used when accessing the input
    *          in the {@link LogicalIOProcessor}
-   * @param inputDescriptor
-   *          the inputDescriptor for this input
-   * @param inputInitializerDescriptor
-   *          An initializer for this Input which may run within the AM. This
-   *          can be used to set the parallelism for this vertex and generate
-   *          {@link RootInputDataInformationEvent}s for the actual Input.</p>
-   *          If this is not specified, the parallelism must be set for the
-   *          vertex. In addition, the Input should know how to access data for
-   *          each of it's tasks. </p> If a {@link TezRootInputInitializer} is
-   *          meant to determine the parallelism of the vertex, the initial
-   *          vertex parallelism should be set to -1. Can be null.
+   * @param dataSourceDescriptor
+   *          the @{link DataSourceDescriptor} for this input.
    * @return this Vertex
    */
-  public Vertex addDataSource(String inputName, InputDescriptor inputDescriptor,
-      @Nullable InputInitializerDescriptor inputInitializerDescriptor) {
+  public Vertex addDataSource(String inputName, DataSourceDescriptor dataSourceDescriptor) {
     additionalInputs
         .add(new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>(
-            inputName, inputDescriptor, inputInitializerDescriptor));
+            inputName, dataSourceDescriptor.getInputDescriptor(),
+            dataSourceDescriptor.getInputInitializerDescriptor()));
+    this.dataSources.add(dataSourceDescriptor);
     return this;
   }
 
@@ -267,25 +258,16 @@ public class Vertex {
    * @param outputName
    *          the name of the output. This will be used when accessing the
    *          output in the {@link LogicalIOProcessor}
-   * @param outputDescriptor
-   * @param outputCommitterDescriptor
-   *          Specify committer to be used for the output Can be null. After all
-   *          tasks in the vertex (or in the DAG) have completed, the committer
-   *          (if specified) is invoked to commit the outputs. Commit is a data
-   *          sink specific operation that usually determines the visibility of
-   *          the output to external observers. E.g. moving output files from
-   *          temporary dirs to the real output dir. When there are multiple
-   *          executions of a task, the commit process also helps decide which
-   *          execution will be included in the final output. Users should
-   *          consider whether their application or data sink need a commit
-   *          operation.
+   * @param dataSinkDescriptor
+   *          the {@link DataSinkDescriptor} for this output
    * @return this Vertex
    */
-  public Vertex addDataSink(String outputName, OutputDescriptor outputDescriptor,
-      @Nullable OutputCommitterDescriptor outputCommitterDescriptor) {
+  public Vertex addDataSink(String outputName, DataSinkDescriptor dataSinkDescriptor) {
     additionalOutputs
         .add(new RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>(
-            outputName, outputDescriptor, outputCommitterDescriptor));
+            outputName, dataSinkDescriptor.getOutputDescriptor(),
+            dataSinkDescriptor.getOutputCommitterDescriptor()));
+    this.dataSinks.add(dataSinkDescriptor);
     return this;
   }
   
@@ -347,13 +329,29 @@ public class Vertex {
     outputEdges.add(edge);
   }
   
+  /**
+   * Get the input vertices for this vertex
+   * @return List of input vertices
+   */
   public List<Vertex> getInputVertices() {
     return Collections.unmodifiableList(inputVertices);
   }
 
+  /**
+   * Get the output vertices for this vertex
+   * @return List of output vertices
+   */
   public List<Vertex> getOutputVertices() {
     return Collections.unmodifiableList(outputVertices);
   }
+  
+  List<DataSourceDescriptor> getDataSources() {
+    return dataSources;
+  }
+  
+  List<DataSinkDescriptor> getDataSinks() {
+    return dataSinks;
+  }
 
   List<Edge> getInputEdges() {
     return inputEdges;

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
index 991350b..894d649 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
@@ -22,8 +22,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import javax.annotation.Nullable;
-
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
@@ -90,11 +88,10 @@ public class VertexGroup {
    * Add an common data sink to the group of vertices.
    * Refer to {@link Vertex#addDataSink(String, OutputDescriptor, OutputCommitterDescriptor)}
    */
-  public VertexGroup addDataSink(String outputName, OutputDescriptor outputDescriptor,
-      @Nullable OutputCommitterDescriptor committerDescriptor) {
+  public VertexGroup addDataSink(String outputName, DataSinkDescriptor dataSinkDescriptor) {
     RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> leafOutput = 
         new RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>(outputName,
-        outputDescriptor, committerDescriptor);
+        dataSinkDescriptor.getOutputDescriptor(), dataSinkDescriptor.getOutputCommitterDescriptor());
     this.groupInfo.outputs.add(outputName);
     
     // also add output to its members

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 91c04fc..b03a23b 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -482,7 +482,7 @@ public class TestDAGVerify {
         new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     
-    v2.addDataSource("v1", new InputDescriptor(), null);
+    v2.addDataSource("v1", new DataSourceDescriptor(null, null, null));
     
     Edge e1 = new Edge(v1, v2,
         new EdgeProperty(DataMovementType.SCATTER_GATHER, 
@@ -506,7 +506,7 @@ public class TestDAGVerify {
         new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     
-    v1.addDataSink("v2", new OutputDescriptor(), null);
+    v1.addDataSink("v2", new DataSinkDescriptor(null, null, null));
     
     Edge e1 = new Edge(v1, v2,
         new EdgeProperty(DataMovementType.SCATTER_GATHER, 
@@ -530,7 +530,7 @@ public class TestDAGVerify {
         new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     
-    v1.addDataSink("v2", new OutputDescriptor(), null);
+    v1.addDataSink("v2", new DataSinkDescriptor(null, null, null));
     
     DAG dag = new DAG("testDag");
     dag.addVertex(v1);
@@ -547,7 +547,7 @@ public class TestDAGVerify {
         new ProcessorDescriptor("MapProcessor"),
         dummyTaskCount, dummyTaskResource);
     
-    v1.addDataSource("v2", new InputDescriptor(), null);
+    v1.addDataSource("v2", new DataSourceDescriptor(null, null, null));
     
     DAG dag = new DAG("testDag");
     dag.addVertex(v1);
@@ -606,7 +606,7 @@ public class TestDAGVerify {
     DAG dag = new DAG("testDag");
     VertexGroup uv12 = dag.createVertexGroup("uv12", v1, v2);
     OutputDescriptor outDesc = new OutputDescriptor();
-    uv12.addDataSink("uvOut", outDesc, null);
+    uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, null, null));
     
     GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
         new EdgeProperty(DataMovementType.SCATTER_GATHER, 
@@ -663,7 +663,7 @@ public class TestDAGVerify {
     String groupName1 = "uv12";
     VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
     OutputDescriptor outDesc = new OutputDescriptor();
-    uv12.addDataSink("uvOut", outDesc, null);
+    uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, null, null));
     
     String groupName2 = "uv23";
     VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3);
@@ -745,7 +745,7 @@ public class TestDAGVerify {
     String groupName1 = "uv12";
     VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
     OutputDescriptor outDesc = new OutputDescriptor();
-    uv12.addDataSink("uvOut", outDesc, null);
+    uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, null, null));
     
     String groupName2 = "uv23";
     VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3);
@@ -880,8 +880,8 @@ public class TestDAGVerify {
         .getBytes());
     InputDescriptor inputDescriptor2 = new InputDescriptor("input2").setUserPayload("inputBytes"
         .getBytes());
-    v1.addDataSource("input1", inputDescriptor1, null);
-    v1.addDataSource("input2", inputDescriptor2, null);
+    v1.addDataSource("input1", new DataSourceDescriptor(inputDescriptor1, null, null));
+    v1.addDataSource("input2", new DataSourceDescriptor(inputDescriptor2, null, null));
 
     dag.addVertex(v1);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index c7fb4b5..6cb5d56 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -361,8 +362,8 @@ public class TestDAGImpl {
         TotalCountingOutputCommitter.class.getName());
     org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
     OutputDescriptor outDesc = new OutputDescriptor("output.class");
-    uv12.addDataSink("uvOut", outDesc, ocd);
-    v3.addDataSink("uvOut", outDesc, ocd);
+    uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
+    v3.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
     
     GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
         new EdgeProperty(DataMovementType.SCATTER_GATHER, 

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
index 5f0f1c9..7ce0de3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -31,6 +31,8 @@ import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputCommitterDescriptor;
@@ -54,8 +56,8 @@ public class TestDAGUtils {
     org.apache.tez.dag.api.Vertex v1 = new org.apache.tez.dag.api.Vertex("vertex1",
         new ProcessorDescriptor("Processor").setHistoryText("vertex1 Processor HistoryText"),
         dummyTaskCount, dummyTaskResource);
-    v1.addDataSource("input1", new InputDescriptor("input.class").setHistoryText("input HistoryText"),
-        null);
+    v1.addDataSource("input1", new DataSourceDescriptor(new InputDescriptor(
+        "input.class").setHistoryText("input HistoryText"), null, null));
     org.apache.tez.dag.api.Vertex v2 = new org.apache.tez.dag.api.Vertex("vertex2",
         new ProcessorDescriptor("Processor").setHistoryText("vertex2 Processor HistoryText"),
         dummyTaskCount, dummyTaskResource);
@@ -69,8 +71,8 @@ public class TestDAGUtils {
     OutputDescriptor outDesc = new OutputDescriptor("output.class")
         .setHistoryText("uvOut HistoryText");
     OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(OutputCommitter.class.getName());
-    uv12.addDataSink("uvOut", outDesc, ocd);
-    v3.addDataSink("uvOut", outDesc, ocd);
+    uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
+    v3.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
 
     GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
         new EdgeProperty(DataMovementType.SCATTER_GATHER,

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 465fec2..8fe8e88 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -54,6 +54,8 @@ import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
@@ -200,10 +202,13 @@ public class FilterLinesByWord extends Configured implements Tool {
     // Configure the Input for stage1
     Class<? extends TezRootInputInitializer> initializerClazz = generateSplitsInClient ? null
         : MRInputAMSplitGenerator.class;
-    stage1Vertex.addDataSource("MRInput",
-        new InputDescriptor(MRInputLegacy.class.getName())
-            .setUserPayload(MRHelpers.createMRInputPayload(stage1Payload, null)),
-        (initializerClazz==null ? null : new InputInitializerDescriptor(initializerClazz.getName())));
+    stage1Vertex.addDataSource(
+        "MRInput",
+        new DataSourceDescriptor(new InputDescriptor(MRInputLegacy.class
+            .getName()).setUserPayload(MRHelpers.createMRInputPayload(
+            stage1Payload, null)), 
+            (initializerClazz == null ? null
+            : new InputInitializerDescriptor(initializerClazz.getName())), null));
 
     // Setup stage2 Vertex
     Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
@@ -216,7 +221,7 @@ public class FilterLinesByWord extends Configured implements Tool {
     OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
         .setUserPayload(MRHelpers.createUserPayloadFromConf(stage2Conf));
     OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName());
-    stage2Vertex.addDataSink("MROutput", od, ocd);
+    stage2Vertex.addDataSink("MROutput", new DataSinkDescriptor(od, ocd, null));
 
     UnorderedUnpartitionedKVEdgeConfigurer edgeConf = UnorderedUnpartitionedKVEdgeConfigurer
         .newBuilder(Text.class.getName(), TextLongPair.class.getName()).build();

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index 83be555..822061d 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
@@ -186,10 +188,12 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
     // Configure the Input for stage1
     Class<? extends TezRootInputInitializer> initializerClazz = generateSplitsInClient ? null
         : MRInputAMSplitGenerator.class;
-    stage1Vertex.addDataSource("MRInput",
-        new InputDescriptor(MRInputLegacy.class.getName())
-            .setUserPayload(MRHelpers.createMRInputPayload(stage1Payload, null)),
-            (initializerClazz==null ? null : new InputInitializerDescriptor(initializerClazz.getName())));
+    stage1Vertex.addDataSource(
+        "MRInput",
+        new DataSourceDescriptor(new InputDescriptor(MRInputLegacy.class
+            .getName()).setUserPayload(MRHelpers.createMRInputPayload(
+            stage1Payload, null)), (initializerClazz == null ? null
+            : new InputInitializerDescriptor(initializerClazz.getName())), null));
 
     // Setup stage2 Vertex
     Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
@@ -199,10 +203,11 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
     stage2Vertex.setTaskLocalFiles(commonLocalResources);
 
     // Configure the Output for stage2
-    stage2Vertex.addDataSink("MROutput",
-        new OutputDescriptor(MROutput.class.getName()).setUserPayload(MRHelpers
-            .createUserPayloadFromConf(stage2Conf)),
-            new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
+    stage2Vertex.addDataSink(
+        "MROutput",
+        new DataSinkDescriptor(new OutputDescriptor(MROutput.class.getName())
+            .setUserPayload(MRHelpers.createUserPayloadFromConf(stage2Conf)),
+            new OutputCommitterDescriptor(MROutputCommitter.class.getName()), null));
 
     UnorderedUnpartitionedKVEdgeConfigurer edgeConf = UnorderedUnpartitionedKVEdgeConfigurer
         .newBuilder(Text.class.getName(), TextLongPair.class.getName()).build();

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
index d5b0eb9..d83aa34 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
@@ -23,9 +23,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.net.URI;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -36,7 +33,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
@@ -44,15 +40,12 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.OutputCommitterDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.committer.MROutputCommitter;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
@@ -185,7 +178,6 @@ public class IntersectDataGen extends Configured implements Tool {
 
     DAG dag = createDag(tezConf, largeOutPath, smallOutPath, expectedOutputPath, numTasks,
         largeOutSize, smallOutSize);
-    setupURIsForCredentials(dag, largeOutPath, smallOutPath, expectedOutputPath);
 
     tezSession.waitTillReady();
     DAGClient dagClient = tezSession.submitDAG(dag);
@@ -207,22 +199,18 @@ public class IntersectDataGen extends Configured implements Tool {
 
     DAG dag = new DAG("IntersectDataGen");
 
-    byte[] streamOutputPayload = createPayloadForOutput(largeOutPath, tezConf);
-    byte[] hashOutputPayload = createPayloadForOutput(smallOutPath, tezConf);
-    byte[] expectedOutputPayload = createPayloadForOutput(expectedOutputPath, tezConf);
-
     Vertex genDataVertex = new Vertex("datagen", new ProcessorDescriptor(
         GenDataProcessor.class.getName()).setUserPayload(GenDataProcessor.createConfiguration(
         largeOutSizePerTask, smallOutSizePerTask)), numTasks, MRHelpers.getMapResource(tezConf));
-    genDataVertex.addDataSink(STREAM_OUTPUT_NAME,
-        new OutputDescriptor(MROutput.class.getName()).setUserPayload(streamOutputPayload),
-        new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
-    genDataVertex.addDataSink(HASH_OUTPUT_NAME,
-        new OutputDescriptor(MROutput.class.getName()).setUserPayload(hashOutputPayload),
-        new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
-    genDataVertex.addDataSink(EXPECTED_OUTPUT_NAME,
-        new OutputDescriptor(MROutput.class.getName()).setUserPayload(expectedOutputPayload),
-        new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
+    genDataVertex.addDataSink(STREAM_OUTPUT_NAME, 
+        MROutput.createConfigurer(new Configuration(tezConf),
+            TextOutputFormat.class, largeOutPath.toUri().toString()).create());
+    genDataVertex.addDataSink(HASH_OUTPUT_NAME, 
+        MROutput.createConfigurer(new Configuration(tezConf),
+            TextOutputFormat.class, smallOutPath.toUri().toString()).create());
+    genDataVertex.addDataSink(EXPECTED_OUTPUT_NAME, 
+        MROutput.createConfigurer(new Configuration(tezConf),
+            TextOutputFormat.class, expectedOutputPath.toUri().toString()).create());
 
     dag.addVertex(genDataVertex);
 
@@ -351,16 +339,6 @@ public class IntersectDataGen extends Configured implements Tool {
 
   }
 
-  private void setupURIsForCredentials(DAG dag, Path... paths) throws IOException {
-    List<URI> uris = new LinkedList<URI>();
-    for (Path path : paths) {
-      FileSystem fs = path.getFileSystem(getConf());
-      Path qPath = fs.makeQualified(path);
-      uris.add(qPath.toUri());
-    }
-    dag.addURIsForCredentials(uris);
-  }
-
   private int checkOutputDirectory(FileSystem fs, Path path) throws IOException {
     if (fs.exists(path)) {
       System.err.println("Output directory: " + path + " already exists");
@@ -369,10 +347,4 @@ public class IntersectDataGen extends Configured implements Tool {
     return 0;
   }
 
-  private byte[] createPayloadForOutput(Path outputPath, Configuration srcConf) throws IOException {
-    Configuration conf = new Configuration(srcConf);
-    conf.set(FileOutputFormat.OUTDIR, outputPath.toUri().toString());
-    byte[] payload = MROutput.createUserPayload(conf, TextOutputFormat.class.getName(), true);
-    return payload;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
index ef7643a..9a28014 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
@@ -19,10 +19,8 @@
 package org.apache.tez.mapreduce.examples;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
+
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -33,9 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
@@ -44,18 +40,12 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
-import org.apache.tez.dag.api.OutputCommitterDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.committer.MROutputCommitter;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
@@ -168,7 +158,6 @@ public class IntersectExample extends Configured implements Tool {
     }
 
     DAG dag = createDag(tezConf, streamInputPath, hashInputPath, outputPath, numPartitions);
-    setupURIsForCredentials(dag, streamInputPath, hashInputPath, outputPath);
 
     tezSession.waitTillReady();
     DAGClient dagClient = tezSession.submitDAG(dag);
@@ -185,18 +174,6 @@ public class IntersectExample extends Configured implements Tool {
       int numPartitions) throws IOException {
     DAG dag = new DAG("IntersectExample");
 
-    // Configuration for src1
-    Configuration streamInputConf = new Configuration(tezConf);
-    streamInputConf.set(FileInputFormat.INPUT_DIR, streamPath.toUri().toString());
-    byte[] streamInputPayload = MRInput.createUserPayload(streamInputConf,
-        TextInputFormat.class.getName(), true, false);
-
-    // Configuration for src2
-    Configuration hashInputConf = new Configuration(tezConf);
-    hashInputConf.set(FileInputFormat.INPUT_DIR, hashPath.toUri().toString());
-    byte[] hashInputPayload = MRInput.createUserPayload(hashInputConf,
-        TextInputFormat.class.getName(), true, false);
-
     // Configuration for intermediate output - shared by Vertex1 and Vertex2
     // This should only be setting selective keys from the underlying conf. Fix after there's a
     // better mechanism to configure the IOs.
@@ -206,32 +183,26 @@ public class IntersectExample extends Configured implements Tool {
             .newBuilder(Text.class.getName(), NullWritable.class.getName(),
                 HashPartitioner.class.getName(), null).build();
 
-    Configuration finalOutputConf = new Configuration(tezConf);
-    finalOutputConf.set(FileOutputFormat.OUTDIR, outPath.toUri().toString());
-    byte[] finalOutputPayload = MROutput.createUserPayload(finalOutputConf,
-        TextOutputFormat.class.getName(), true);
-
     // Change the way resources are setup - no MRHelpers
-    Vertex streamFileVertex = new Vertex("partitioner1",
-        new ProcessorDescriptor(ForwardingProcessor.class.getName()), -1,
-        MRHelpers.getMapResource(tezConf)).addDataSource("streamfile",
-        new InputDescriptor(MRInput.class.getName())
-            .setUserPayload(streamInputPayload), 
-            new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
+    Vertex streamFileVertex = new Vertex("partitioner1", new ProcessorDescriptor(
+        ForwardingProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)).addDataSource(
+        "streamfile",
+        MRInput
+            .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
+                streamPath.toUri().toString()).groupSplitsInAM(false).create());
 
     Vertex hashFileVertex = new Vertex("partitioner2", new ProcessorDescriptor(
-        ForwardingProcessor.class.getName()), -1,
-        MRHelpers.getMapResource(tezConf)).addDataSource("hashfile",
-        new InputDescriptor(MRInput.class.getName())
-            .setUserPayload(hashInputPayload), 
-            new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
+        ForwardingProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)).addDataSource(
+        "hashfile",
+        MRInput
+            .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
+                hashPath.toUri().toString()).groupSplitsInAM(false).create());
 
     Vertex intersectVertex = new Vertex("intersect", new ProcessorDescriptor(
         IntersectProcessor.class.getName()), numPartitions,
         MRHelpers.getReduceResource(tezConf)).addDataSink("finalOutput",
-        new OutputDescriptor(MROutput.class.getName())
-            .setUserPayload(finalOutputPayload), 
-        new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
+        MROutput.createConfigurer(new Configuration(tezConf),
+            TextOutputFormat.class, outPath.toUri().toString()).create());
 
     Edge e1 = new Edge(streamFileVertex, intersectVertex, edgeConf.createDefaultEdgeProperty());
 
@@ -242,16 +213,6 @@ public class IntersectExample extends Configured implements Tool {
     return dag;
   }
 
-  private void setupURIsForCredentials(DAG dag, Path... paths) throws IOException {
-    List<URI> uris = new LinkedList<URI>();
-    for (Path path : paths) {
-      FileSystem fs = path.getFileSystem(getConf());
-      Path qPath = fs.makeQualified(path);
-      uris.add(qPath.toUri());
-    }
-    dag.addURIsForCredentials(uris);
-  }
-
   // private void obtainTokens(Credentials credentials, Path... paths) throws IOException {
   // TokenCache.obtainTokensForNamenodes(credentials, paths, getConf());
   // }

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
index b0a5c6c..ac671a4 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
@@ -19,19 +19,14 @@
 package org.apache.tez.mapreduce.examples;
 
 import java.io.IOException;
-import java.net.URI;
-import java.util.LinkedList;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
@@ -41,8 +36,6 @@ import org.apache.tez.client.TezClient;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -50,7 +43,6 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.examples.IntersectExample.ForwardingProcessor;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.input.MRInput;
@@ -160,7 +152,6 @@ public class IntersectValidate extends Configured implements Tool {
     Path rhsPath = new Path(rhsDir);
 
     DAG dag = createDag(tezConf, lhsPath, rhsPath, numPartitions);
-    setupURIsForCredentials(dag, lhsPath, rhsPath);
 
     tezSession.waitTillReady();
     DAGClient dagClient = tezSession.submitDAG(dag);
@@ -191,18 +182,6 @@ public class IntersectValidate extends Configured implements Tool {
       throws IOException {
     DAG dag = new DAG("IntersectValidate");
 
-    // Configuration for src1
-    Configuration lhsInputConf = new Configuration(tezConf);
-    lhsInputConf.set(FileInputFormat.INPUT_DIR, lhs.toUri().toString());
-    byte[] streamInputPayload = MRInput.createUserPayload(lhsInputConf,
-        TextInputFormat.class.getName(), true, false);
-
-    // Configuration for src2
-    Configuration rhsInputConf = new Configuration(tezConf);
-    rhsInputConf.set(FileInputFormat.INPUT_DIR, rhs.toUri().toString());
-    byte[] hashInputPayload = MRInput.createUserPayload(rhsInputConf,
-        TextInputFormat.class.getName(), true, false);
-
     // Configuration for intermediate output - shared by Vertex1 and Vertex2
     // This should only be setting selective keys from the underlying conf. Fix after there's a
     // better mechanism to configure the IOs.
@@ -213,15 +192,19 @@ public class IntersectValidate extends Configured implements Tool {
     // Change the way resources are setup - no MRHelpers
     Vertex lhsVertex = new Vertex(LHS_INPUT_NAME, new ProcessorDescriptor(
         ForwardingProcessor.class.getName()), -1,
-        MRHelpers.getMapResource(tezConf)).addDataSource("lhs", new InputDescriptor(
-        MRInput.class.getName()).setUserPayload(streamInputPayload),
-        new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
+        MRHelpers.getMapResource(tezConf)).addDataSource(
+        "lhs",
+        MRInput
+        .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
+            lhs.toUri().toString()).groupSplitsInAM(false).create());
 
     Vertex rhsVertex = new Vertex(RHS_INPUT_NAME, new ProcessorDescriptor(
         ForwardingProcessor.class.getName()), -1,
-        MRHelpers.getMapResource(tezConf)).addDataSource("rhs", new InputDescriptor(
-        MRInput.class.getName()).setUserPayload(hashInputPayload),
-        new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
+        MRHelpers.getMapResource(tezConf)).addDataSource(
+        "rhs",
+        MRInput
+        .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
+            rhs.toUri().toString()).groupSplitsInAM(false).create());
 
     Vertex intersectValidateVertex = new Vertex("intersectvalidate",
         new ProcessorDescriptor(IntersectValidateProcessor.class.getName()),
@@ -278,13 +261,4 @@ public class IntersectValidate extends Configured implements Tool {
     }
   }
 
-  private void setupURIsForCredentials(DAG dag, Path... paths) throws IOException {
-    List<URI> uris = new LinkedList<URI>();
-    for (Path path : paths) {
-      FileSystem fs = path.getFileSystem(getConf());
-      Path qPath = fs.makeQualified(path);
-      uris.add(qPath.toUri());
-    }
-    dag.addURIsForCredentials(uris);
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index ab75441..7279136 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -63,20 +63,21 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
-import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
@@ -479,6 +480,8 @@ public class MRRSleepJob extends Configured implements Tool {
       MRHelpers.doJobClientMagic(finalReduceConf);
     }
 
+    DataSourceDescriptor dataSource = null;
+    List<TaskLocationHint> taskLocHint = null;
     InputSplitInfo inputSplitInfo = null;
     if (!generateSplitsInAM) {
       if (writeSplitsToDFS) {
@@ -491,19 +494,19 @@ public class MRRSleepJob extends Configured implements Tool {
         } catch (ClassNotFoundException e) {
           throw new TezUncheckedException("Failed to generate input splits", e);
         }
-      } else {
-        try {
-          LOG.info("Creating in-mem splits");
-          inputSplitInfo = MRHelpers.generateInputSplitsToMem(mapStageConf);
-        } catch (ClassNotFoundException e) {
-          throw new TezUncheckedException("Could not generate input splits", e);
-        } catch (InterruptedException e) {
-          throw new TezUncheckedException("Could not generate input splits", e);
+        if (inputSplitInfo.getCredentials() != null) {
+          this.credentials.addAll(inputSplitInfo.getCredentials());
         }
+        taskLocHint = inputSplitInfo.getTaskLocationHints();
+        byte[] mapInputPayload = MRHelpers.createMRInputPayload(mapStageConf, null);
+        InputDescriptor id = new InputDescriptor(MRInput.class.getName()).setUserPayload(mapInputPayload);
+        dataSource = new DataSourceDescriptor(id, null, null);
+      } else {
+        dataSource = MRInput.createConfigurer(mapStageConf, SleepInputFormat.class).
+            generateSplitsInAM(false).create();
       }
-      if (inputSplitInfo.getCredentials() != null) {
-        this.credentials.addAll(inputSplitInfo.getCredentials());
-      }
+    } else {
+      dataSource = MRInput.createConfigurer(mapStageConf, SleepInputFormat.class).create();
     }
 
     DAG dag = new DAG("MRRSleepJob");
@@ -533,24 +536,15 @@ public class MRRSleepJob extends Configured implements Tool {
     List<Vertex> vertices = new ArrayList<Vertex>();
 
     
-    byte[] mapInputPayload = null;
     byte[] mapUserPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
-    if (writeSplitsToDFS || generateSplitsInAM) {
-      mapInputPayload = MRHelpers.createMRInputPayload(mapUserPayload, null);
-    } else {
-      mapInputPayload = MRHelpers.createMRInputPayload(
-          mapUserPayload, inputSplitInfo.getSplitsProto());
-    }
     int numTasks = generateSplitsInAM ? -1 : numMapper;
     
     Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
         MapProcessor.class.getName()).setUserPayload(mapUserPayload),
         numTasks, MRHelpers.getMapResource(mapStageConf));
-    if (!generateSplitsInAM) {
-      mapVertex.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints()));
-    }
-    
+
     if (writeSplitsToDFS) {
+      mapVertex.setLocationHint(new VertexLocationHint(taskLocHint));
       Map<String, LocalResource> mapLocalResources = new HashMap<String, LocalResource>();
       mapLocalResources.putAll(commonLocalResources);
       MRHelpers.updateLocalResourcesForInputSplits(remoteFs, inputSplitInfo,
@@ -560,17 +554,7 @@ public class MRRSleepJob extends Configured implements Tool {
       mapVertex.setTaskLocalFiles(commonLocalResources);
     }
 
-    if (generateSplitsInAM) {
-      MRHelpers.addMRInput(mapVertex, mapInputPayload, 
-          new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
-    } else {
-      if (writeSplitsToDFS) {
-        MRHelpers.addMRInput(mapVertex, mapInputPayload, null);
-      } else {
-        MRHelpers.addMRInput(mapVertex, mapInputPayload, 
-            new InputInitializerDescriptor(MRInputSplitDistributor.class.getName()));
-      }
-    }
+    mapVertex.addDataSource("MRInput", dataSource);
     vertices.add(mapVertex);
 
     if (iReduceStagesCount > 0

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index 4455bb4..259a314 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -30,21 +30,18 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.GroupInputEdge;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
-import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -52,10 +49,9 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.mapreduce.committer.MROutputCommitter;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.input.MRInput.MRInputConfigurer;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -174,27 +170,24 @@ public class UnionExample {
     
     int numMaps = -1;
     Configuration inputConf = new Configuration(tezConf);
-    inputConf.set(FileInputFormat.INPUT_DIR, inputPath);
-    InputDescriptor id = new InputDescriptor(MRInput.class.getName())
-        .setUserPayload(MRInput.createUserPayload(inputConf,
-            TextInputFormat.class.getName(), true, true));
+    MRInputConfigurer configurer = MRInput.createConfigurer(inputConf, TextInputFormat.class,
+        inputPath);
+    DataSourceDescriptor dataSource = configurer.generateSplitsInAM(false).create();
 
     Vertex mapVertex1 = new Vertex("map1", new ProcessorDescriptor(
         TokenProcessor.class.getName()),
         numMaps, MRHelpers.getMapResource(tezConf));
-    InputInitializerDescriptor iid = 
-        new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName());
-    mapVertex1.addDataSource("MRInput", id, iid);
+    mapVertex1.addDataSource("MRInput", dataSource);
 
     Vertex mapVertex2 = new Vertex("map2", new ProcessorDescriptor(
         TokenProcessor.class.getName()),
         numMaps, MRHelpers.getMapResource(tezConf));
-    mapVertex2.addDataSource("MRInput", id, iid);
+    mapVertex2.addDataSource("MRInput", dataSource);
 
     Vertex mapVertex3 = new Vertex("map3", new ProcessorDescriptor(
         TokenProcessor.class.getName()),
         numMaps, MRHelpers.getMapResource(tezConf));
-    mapVertex3.addDataSource("MRInput", id, iid);
+    mapVertex3.addDataSource("MRInput", dataSource);
 
     Vertex checkerVertex = new Vertex("checker",
         new ProcessorDescriptor(
@@ -202,28 +195,21 @@ public class UnionExample {
                 1, MRHelpers.getReduceResource(tezConf));
 
     Configuration outputConf = new Configuration(tezConf);
-    outputConf.set(FileOutputFormat.OUTDIR, outputPath);
-    OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
-      .setUserPayload(MROutput.createUserPayload(
-          outputConf, TextOutputFormat.class.getName(), true));
-    OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName());
-    checkerVertex.addDataSink("union", od, ocd);
+    DataSinkDescriptor od = MROutput.createConfigurer(outputConf,
+        TextOutputFormat.class, outputPath).create();
+    checkerVertex.addDataSink("union", od);
+    
 
     Configuration allPartsConf = new Configuration(tezConf);
-    allPartsConf.set(FileOutputFormat.OUTDIR, outputPath+"-all-parts");
-    OutputDescriptor od2 = new OutputDescriptor(MROutput.class.getName())
-      .setUserPayload(MROutput.createUserPayload(
-          allPartsConf, TextOutputFormat.class.getName(), true));
-    checkerVertex.addDataSink("all-parts", od2, ocd);
-
-    Configuration partsConf = new Configuration(tezConf);
-    partsConf.set(FileOutputFormat.OUTDIR, outputPath+"-parts");
-    
+    DataSinkDescriptor od2 = MROutput.createConfigurer(allPartsConf,
+        TextOutputFormat.class, outputPath + "-all-parts").create();
+    checkerVertex.addDataSink("all-parts", od2);
+
+    Configuration partsConf = new Configuration(tezConf);    
+    DataSinkDescriptor od1 = MROutput.createConfigurer(partsConf,
+        TextOutputFormat.class, outputPath + "-parts").create();
     VertexGroup unionVertex = dag.createVertexGroup("union", mapVertex1, mapVertex2);
-    OutputDescriptor od1 = new OutputDescriptor(MROutput.class.getName())
-      .setUserPayload(MROutput.createUserPayload(
-          partsConf, TextOutputFormat.class.getName(), true));
-    unionVertex.addDataSink("parts", od1, ocd);
+    unionVertex.addDataSink("parts", od1);
 
     OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
         .newBuilder(Text.class.getName(), IntWritable.class.getName(),

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index 0de2b04..ab5ac57 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -29,9 +29,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
@@ -40,18 +38,14 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
-import org.apache.tez.dag.api.OutputCommitterDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.committer.MROutputCommitter;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
@@ -123,29 +117,20 @@ public class WordCount extends Configured implements Tool {
       Map<String, LocalResource> localResources, Path stagingDir,
       String inputPath, String outputPath) throws IOException {
 
-    Configuration inputConf = new Configuration(tezConf);
-    inputConf.set(FileInputFormat.INPUT_DIR, inputPath);
-    InputDescriptor id = new InputDescriptor(MRInput.class.getName())
-        .setUserPayload(MRInput.createUserPayload(inputConf,
-            TextInputFormat.class.getName(), true, true));
-    InputInitializerDescriptor iid = new InputInitializerDescriptor(
-        MRInputAMSplitGenerator.class.getName());
-
-    Configuration outputConf = new Configuration(tezConf);
-    outputConf.set(FileOutputFormat.OUTDIR, outputPath);
-    OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
-      .setUserPayload(MROutput.createUserPayload(
-          outputConf, TextOutputFormat.class.getName(), true));
-    OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName());
+    DataSourceDescriptor dataSource = MRInput.createConfigurer(new Configuration(tezConf),
+        TextInputFormat.class, inputPath).create();
+
+    DataSinkDescriptor dataSink = MROutput.createConfigurer(new Configuration(tezConf),
+        TextOutputFormat.class, outputPath).create();
 
     Vertex tokenizerVertex = new Vertex("tokenizer", new ProcessorDescriptor(
         TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf));
-    tokenizerVertex.addDataSource("MRInput", id, iid);
+    tokenizerVertex.addDataSource("MRInput", dataSource);
 
     Vertex summerVertex = new Vertex("summer",
         new ProcessorDescriptor(
             SumProcessor.class.getName()), 1, MRHelpers.getReduceResource(tezConf));
-    summerVertex.addDataSink("MROutput", od, ocd);
+    summerVertex.addDataSink("MROutput", dataSink);
 
     OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
         .newBuilder(Text.class.getName(), IntWritable.class.getName(),

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index c6b2fe4..48679c6 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -63,6 +63,8 @@ import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.TezYARNUtils;
 import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.OutputCommitterDescriptor;
@@ -983,14 +985,15 @@ public class MRHelpers {
       InputInitializerDescriptor initClazz) {
     InputDescriptor id = new InputDescriptor(MRInputLegacy.class.getName())
         .setUserPayload(userPayload);
-    vertex.addDataSource("MRInput", id, initClazz);
+    vertex.addDataSource("MRInput", new DataSourceDescriptor(id, initClazz, null));
   }
 
   @Private
   public static void addMROutputLegacy(Vertex vertex, byte[] userPayload) {
     OutputDescriptor od = new OutputDescriptor(MROutputLegacy.class.getName())
         .setUserPayload(userPayload);
-    vertex.addDataSink("MROutput", od, new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
+    vertex.addDataSink("MROutput", new DataSinkDescriptor(od,
+        new OutputCommitterDescriptor(MROutputCommitter.class.getName()), null));
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index e36eb4d..bcb8f37 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -18,6 +18,7 @@
 package org.apache.tez.mapreduce.input;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.List;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
@@ -26,13 +27,25 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
+import org.apache.hadoop.security.Credentials;
+import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
+import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.input.base.MRInputBase;
@@ -48,6 +61,7 @@ import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /**
  * {@link MRInput} is an {@link Input} which provides key/values pairs
@@ -61,6 +75,208 @@ import com.google.common.base.Preconditions;
 
 public class MRInput extends MRInputBase {
 
+  /**
+   * Helper class to configure {@link MRInput}
+   *
+   */
+  public static class MRInputConfigurer {
+    final Configuration conf;
+    final Class<?> inputFormat;
+    boolean useNewApi;
+    boolean groupSplitsInAM = true;
+    boolean generateSplitsInAM = true;
+    String inputClassName = MRInput.class.getName();
+    boolean getCredentialsForSourceFilesystem = true;
+    String inputPaths = null;
+    
+    private MRInputConfigurer(Configuration conf, Class<?> inputFormat) {
+      this.conf = conf;
+      this.inputFormat = inputFormat;
+      if (org.apache.hadoop.mapred.InputFormat.class.isAssignableFrom(inputFormat)) {
+        useNewApi = false;
+      } else if(org.apache.hadoop.mapreduce.InputFormat.class.isAssignableFrom(inputFormat)) {
+        useNewApi = true;
+      } else {
+        throw new TezUncheckedException("inputFormat must be assignable from either " +
+            "org.apache.hadoop.mapred.InputFormat or " +
+            "org.apache.hadoop.mapreduce.InputFormat" +
+            " Given: " + inputFormat.getName());
+      }
+    }
+    
+    MRInputConfigurer setInputClassName(String className) {
+      this.inputClassName = className;
+      return this;
+    }
+
+    private MRInputConfigurer setInputPaths(String inputPaths) {
+      if (!(org.apache.hadoop.mapred.FileInputFormat.class.isAssignableFrom(inputFormat) || 
+          FileInputFormat.class.isAssignableFrom(inputFormat))) {
+        throw new TezUncheckedException("When setting inputPaths the inputFormat must be " + 
+            "assignable from either org.apache.hadoop.mapred.FileInputFormat or " +
+            "org.apache.hadoop.mapreduce.lib.input.FileInputFormat. " +
+            "Otherwise use the non-path configurer." +
+            " Given: " + inputFormat.getName());
+      }
+      conf.set(FileInputFormat.INPUT_DIR, inputPaths);
+      this.inputPaths = inputPaths;
+      return this;
+    }
+    
+    /**
+     * Set whether splits should be grouped in the Tez App Master (default true)
+     * @param value whether to group splits in the AM or not
+     * @return {@link MRInputConfigurer}
+     */
+    public MRInputConfigurer groupSplitsInAM(boolean value) {
+      groupSplitsInAM = value;
+      return this;
+    }
+    
+    /**
+     * Set whether splits should be generated in the Tez App Master (default true)
+     * @param value whether to generate splits in the AM or not
+     * @return {@link MRInputConfigurer}
+     */
+    public MRInputConfigurer generateSplitsInAM(boolean value) {
+      generateSplitsInAM = value;
+      return this;
+    }
+    
+    /**
+     * Get the credentials for the inputPaths from their {@link FileSystem}s
+     * Use the method to turn this off when not using a {@link FileSystem}
+     * or when {@link Credentials} are not supported
+     * @param value whether to get credentials or not. (true by default)
+     * @return {@link MRInputConfigurer}
+     */
+    public MRInputConfigurer getCredentialsForSourceFileSystem(boolean value) {
+      getCredentialsForSourceFilesystem = value;
+      return this;
+    }
+        
+    /**
+     * Create the {@link DataSourceDescriptor}
+     * @return {@link DataSourceDescriptor}
+     */
+    public DataSourceDescriptor create() {
+      try {
+        if (generateSplitsInAM) {
+          return createGeneratorDataSource();
+        } else {
+          return createDistributorDataSource();
+        }
+      } catch (Exception e) {
+        throw new TezUncheckedException(e);
+      } 
+    }
+    
+    private DataSourceDescriptor createDistributorDataSource() throws IOException {
+      Configuration inputConf = new JobConf(conf);
+      InputSplitInfo inputSplitInfo;
+      try {
+        inputSplitInfo = MRHelpers.generateInputSplitsToMem(inputConf);
+      } catch (Exception e) {
+        throw new TezUncheckedException(e);
+      }
+      inputConf.setBoolean("mapred.mapper.new-api", useNewApi);
+      if (useNewApi) {
+        inputConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, inputFormat.getName());
+      } else {
+        inputConf.set("mapred.input.format.class", inputFormat.getName());
+      }
+      MRHelpers.translateVertexConfToTez(inputConf);
+      MRHelpers.doJobClientMagic(inputConf);
+      byte[] payload = MRHelpers.createMRInputPayload(inputConf, inputSplitInfo.getSplitsProto());
+      Credentials credentials = null;
+      if (getCredentialsForSourceFilesystem && inputSplitInfo.getCredentials() != null) {
+        credentials = inputSplitInfo.getCredentials();
+      }
+      return new DataSourceDescriptor(
+          new InputDescriptor(inputClassName).setUserPayload(payload),
+          new InputInitializerDescriptor(MRInputSplitDistributor.class.getName()),
+          inputSplitInfo.getNumTasks(), credentials, 
+          new VertexLocationHint(inputSplitInfo.getTaskLocationHints()));
+    }
+    
+    private DataSourceDescriptor createGeneratorDataSource() throws IOException {
+      Configuration inputConf = new JobConf(conf);
+      String wrappedInputFormatClassName = null;
+      String configInputFormatClassName = null;
+      if (groupSplitsInAM) {
+        wrappedInputFormatClassName = inputFormat.getName();
+        if (useNewApi) {
+          configInputFormatClassName = TezGroupedSplitsInputFormat.class.getName();
+        } else {
+          configInputFormatClassName = 
+              org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat.class.getName();
+        }
+      } else {
+        wrappedInputFormatClassName = null;
+        configInputFormatClassName = inputFormat.getName();
+      }
+      inputConf.setBoolean("mapred.mapper.new-api", useNewApi);
+      if (useNewApi) {
+        inputConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, configInputFormatClassName);
+      } else {
+        inputConf.set("mapred.input.format.class", configInputFormatClassName);
+      }
+      MRHelpers.translateVertexConfToTez(inputConf);
+      MRHelpers.doJobClientMagic(inputConf);
+      
+      Credentials credentials = null;
+      if (getCredentialsForSourceFilesystem && inputPaths != null) {
+        try {
+          List<URI> uris = Lists.newLinkedList();
+          for (String inputPath : inputPaths.split(",")) {
+            Path path = new Path(inputPath);
+            FileSystem fs;
+            fs = path.getFileSystem(conf);
+            Path qPath = fs.makeQualified(path);
+            uris.add(qPath.toUri());
+          }
+          credentials = new Credentials();
+          TezClientUtils.addFileSystemCredentialsFromURIs(uris, credentials, conf);
+        } catch (IOException e) {
+          throw new TezUncheckedException(e);
+        }
+      }
+
+      byte[] payload = null;
+      if (groupSplitsInAM) {
+        payload = MRHelpers.createMRInputPayloadWithGrouping(inputConf,
+            wrappedInputFormatClassName);
+      } else {
+        payload = MRHelpers.createMRInputPayload(inputConf, null);
+      }
+      return new DataSourceDescriptor(
+          new InputDescriptor(inputClassName).setUserPayload(payload),
+          new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()), credentials);
+    }
+  }
+  
+  /**
+   * Create an {@link MRInputConfigurer}
+   * @param conf Configuration for the {@link MRInput}
+   * @param inputFormat InputFormat derived class
+   * @return {@link MRInputConfigurer}
+   */
+  public static MRInputConfigurer createConfigurer(Configuration conf, Class<?> inputFormat) {
+    return new MRInputConfigurer(conf, inputFormat);
+  }
+
+  /**
+   * Create an {@link MRInputConfigurer} for a FileInputFormat
+   * @param conf Configuration for the {@link MRInput}
+   * @param inputFormat FileInputFormat derived class
+   * @param inputPaths Comma separated input paths
+   * @return {@link MRInputConfigurer}
+   */
+  public static MRInputConfigurer createConfigurer(Configuration conf, Class<?> inputFormat,
+      String inputPaths) {
+    return new MRInputConfigurer(conf, inputFormat).setInputPaths(inputPaths);
+  }
+  
   private static final Log LOG = LogFactory.getLog(MRInput.class);
   
   private final ReentrantLock rrLock = new ReentrantLock();
@@ -83,54 +299,6 @@ public class MRInput extends MRInputBase {
     super(inputContext, numPhysicalInputs);
   }
 
-  /**
-   * Helper API to generate the user payload for the MRInput and
-   * MRInputAMSplitGenerator (if used). The InputFormat will be invoked by Tez
-   * at DAG runtime to generate the input splits.
-   * 
-   * @param conf
-   *          Configuration for the InputFormat
-   * @param inputFormatClassName
-   *          Name of the class of the InputFormat
-   * @param useNewApi
-   *          use new mapreduce API or old mapred API
-   * @param groupSplitsInAM
-   *          do grouping of splits in the AM. If true then splits generated by
-   *          the InputFormat will be grouped in the AM based on available
-   *          resources, locality etc. This option may be set to true only when
-   *          using MRInputAMSplitGenerator as the initializer class in
-   *          {@link Vertex#addDataSource(String, org.apache.tez.dag.api.InputDescriptor, 
-   *          org.apache.tez.dag.api.InputInitializerDescriptor)}
-   * @return returns the user payload to be set on the InputDescriptor of
-   *         MRInput
-   * @throws IOException
-   */
-  public static byte[] createUserPayload(Configuration conf,
-      String inputFormatClassName, boolean useNewApi, boolean groupSplitsInAM)
-      throws IOException {
-    Configuration inputConf = new JobConf(conf);
-    String wrappedInputFormatClassName = null;
-    String configInputFormatClassName = null;
-    if (groupSplitsInAM) {
-      wrappedInputFormatClassName = inputFormatClassName;
-      configInputFormatClassName = TezGroupedSplitsInputFormat.class.getName();
-    } else {
-      wrappedInputFormatClassName = null;
-      configInputFormatClassName = inputFormatClassName;
-    }
-    inputConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
-        configInputFormatClassName);
-    inputConf.setBoolean("mapred.mapper.new-api", useNewApi);
-    MRHelpers.translateVertexConfToTez(inputConf);
-    MRHelpers.doJobClientMagic(inputConf);
-    if (groupSplitsInAM) {
-      return MRHelpers.createMRInputPayloadWithGrouping(inputConf,
-          wrappedInputFormatClassName);
-    } else {
-      return MRHelpers.createMRInputPayload(inputConf, null);
-    }
-  }
-  
   @Override
   public List<Event> initialize() throws IOException {
     super.initialize();