You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/08/05 22:05:40 UTC
[1/2] TEZ-1317. Simplify MRinput/MROutput configuration (bikas)
Repository: tez
Updated Branches:
refs/heads/master 5ae48c6ef -> 1831fda98
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
index d999780..d6e2989 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -27,8 +27,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;
+import org.apache.tez.mapreduce.input.MRInput.MRInputConfigurer;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
@@ -42,6 +44,30 @@ public class MRInputLegacy extends MRInput {
private ReentrantLock eventLock = new ReentrantLock();
private Condition eventCondition = eventLock.newCondition();
+
+ /**
+ * Create an {@link MRInputConfigurer}
+ * @param conf Configuration for the {@link MRInputLegacy}
+ * @param inputFormat InputFormat derived class
+ * @return {@link MRInputConfigurer}
+ */
+ public static MRInputConfigurer createtConfigurer(Configuration conf, Class<?> inputFormat) {
+ return MRInput.createConfigurer(conf, inputFormat).setInputClassName(MRInputLegacy.class.getName());
+ }
+
+ /**
+ * Create an {@link MRInputConfigurer} for a FileInputFormat
+ * @param conf Configuration for the {@link MRInputLegacy}
+ * @param inputFormat FileInputFormat derived class
+ * @param inputPaths Comma separated input paths
+ * @return {@link MRInputConfigurer}
+ */
+ public static MRInputConfigurer createConfigurer(Configuration conf, Class<?> inputFormat,
+ String inputPaths) {
+ return MRInput.createConfigurer(conf, inputFormat, inputPaths).setInputClassName(
+ MRInputLegacy.class.getName());
+ }
+
public MRInputLegacy(TezInputContext inputContext, int numPhysicalInputs) {
super(inputContext, numPhysicalInputs);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index bda3f1b..709a2c4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.output;
import java.io.IOException;
import java.text.NumberFormat;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,11 +36,18 @@ import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.OutputCommitterDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.mapreduce.committer.MROutputCommitter;
import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -51,7 +59,135 @@ import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
+
public class MROutput extends AbstractLogicalOutput {
+
+ /**
+ * Helper class to configure {@link MROutput}
+ *
+ */
+ public static class MROutputConfigurer {
+ final Configuration conf;
+ final Class<?> outputFormat;
+ boolean useNewApi;
+ boolean getCredentialsForSinkFilesystem = true;
+ String outputPath;
+
+ private MROutputConfigurer(Configuration conf, Class<?> outputFormat) {
+ this.conf = conf;
+ this.outputFormat = outputFormat;
+ if (org.apache.hadoop.mapred.OutputFormat.class.isAssignableFrom(outputFormat)) {
+ useNewApi = false;
+ } else if(org.apache.hadoop.mapreduce.OutputFormat.class.isAssignableFrom(outputFormat)) {
+ useNewApi = true;
+ } else {
+ throw new TezUncheckedException("outputFormat must be assignable from either " +
+ "org.apache.hadoop.mapred.OutputFormat or " +
+ "org.apache.hadoop.mapreduce.OutputFormat" +
+ " Given: " + outputFormat.getName());
+ }
+ }
+
+ private MROutputConfigurer setOutputPath(String outputPath) {
+ if (!(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class.isAssignableFrom(outputFormat) ||
+ FileOutputFormat.class.isAssignableFrom(outputFormat))) {
+ throw new TezUncheckedException("When setting outputPath the outputFormat must " +
+ "be assignable from either org.apache.hadoop.mapred.FileOutputFormat or " +
+ "org.apache.hadoop.mapreduce.lib.output.FileOutputFormat. " +
+ "Otherwise use the non-path configurer." +
+ " Given: " + outputFormat.getName());
+ }
+ conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, outputPath);
+ this.outputPath = outputPath;
+ return this;
+ }
+
+ /**
+ * Create the {@link DataSinkDescriptor}
+ * @return {@link DataSinkDescriptor}
+ */
+ public DataSinkDescriptor create() {
+ Credentials credentials = null;
+ if (getCredentialsForSinkFilesystem && outputPath != null) {
+ try {
+ Path path = new Path(outputPath);
+ FileSystem fs;
+ fs = path.getFileSystem(conf);
+ Path qPath = fs.makeQualified(path);
+ credentials = new Credentials();
+ TezClientUtils.addFileSystemCredentialsFromURIs(Collections.singletonList(qPath.toUri()),
+ credentials, conf);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ return new DataSinkDescriptor(
+ new OutputDescriptor(MROutput.class.getName()).setUserPayload(createUserPayload(conf,
+ outputFormat.getName(), useNewApi)), new OutputCommitterDescriptor(
+ MROutputCommitter.class.getName()), credentials);
+ }
+
+ /**
+ * Get the credentials for the output from its {@link FileSystem}s
+ * Use the method to turn this off when not using a {@link FileSystem}
+ * or when {@link Credentials} are not supported
+ * @param value whether to get credentials or not. (true by default)
+ * @return {@link MROutputConfigurer}
+ */
+ public MROutputConfigurer getCredentialsForSinkFileSystem(boolean value) {
+ getCredentialsForSinkFilesystem = value;
+ return this;
+ }
+
+ /**
+ * Creates the user payload to be set on the OutputDescriptor for MROutput
+ * @param conf Configuration for the OutputFormat
+ * @param outputFormatName Name of the class of the OutputFormat
+ * @param useNewApi Use new mapreduce API or old mapred API
+ * @return
+ * @throws IOException
+ */
+ private byte[] createUserPayload(Configuration conf,
+ String outputFormatName, boolean useNewApi) {
+ Configuration outputConf = new JobConf(conf);
+ outputConf.setBoolean("mapred.reducer.new-api", useNewApi);
+ if (useNewApi) {
+ outputConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, outputFormatName);
+ } else {
+ outputConf.set("mapred.output.format.class", outputFormatName);
+ }
+ MRHelpers.translateVertexConfToTez(outputConf);
+ try {
+ MRHelpers.doJobClientMagic(outputConf);
+ return TezUtils.createUserPayloadFromConf(outputConf);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+ }
+
+ /**
+ * Create an {@link MROutputConfigurer}
+ * @param conf Configuration for the {@link MROutput}
+ * @param outputFormat OutputFormat derived class
+ * @return {@link MROutputConfigurer}
+ */
+ public static MROutputConfigurer createtConfigurer(Configuration conf, Class<?> outputFormat) {
+ return new MROutputConfigurer(conf, outputFormat);
+ }
+
+ /**
+ * Create an {@link MROutputConfigurer} for a FileOutputFormat
+ * @param conf Configuration for the {@link MROutput}
+ * @param outputFormat FileInputFormat derived class
+ * @param outputPath Output path
+ * @return {@link MROutputConfigurer}
+ */
+ public static MROutputConfigurer createConfigurer(Configuration conf, Class<?> outputFormat,
+ String outputPath) {
+ return new MROutputConfigurer(conf, outputFormat).setOutputPath(outputPath);
+ }
private static final Log LOG = LogFactory.getLog(MROutput.class);
@@ -85,24 +221,6 @@ public class MROutput extends AbstractLogicalOutput {
super(outputContext, numPhysicalOutputs);
}
- /**
- * Creates the user payload to be set on the OutputDescriptor for MROutput
- * @param conf Configuration for the OutputFormat
- * @param outputFormatName Name of the class of the OutputFormat
- * @param useNewApi Use new mapreduce API or old mapred API
- * @return
- * @throws IOException
- */
- public static byte[] createUserPayload(Configuration conf,
- String outputFormatName, boolean useNewApi) throws IOException {
- Configuration outputConf = new JobConf(conf);
- outputConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, outputFormatName);
- outputConf.setBoolean("mapred.mapper.new-api", useNewApi);
- MRHelpers.translateVertexConfToTez(outputConf);
- MRHelpers.doJobClientMagic(outputConf);
- return TezUtils.createUserPayloadFromConf(outputConf);
- }
-
@Override
public List<Event> initialize() throws IOException, InterruptedException {
LOG.info("Initializing Simple Output");
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseConf.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseConf.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseConf.java
deleted file mode 100644
index cf65abd..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseConf.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.conf;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.tez.dag.api.EdgeProperty;
-
-@InterfaceAudience.Private
-abstract class HadoopKeyValuesBasedBaseConf {
-
- /**
- * Get the payload for the configured Output
- * @return output configuration as a byte array
- */
- public abstract byte[] getOutputPayload();
-
- /**
- * Get the output class name
- * @return the output class name
- */
- public abstract String getOutputClassName();
-
- /**
- * Get the payload for the configured Input
- * @return input configuration as a byte array
- */
- public abstract byte[] getInputPayload();
-
- /**
- * Get the input class name
- * @return the input class name
- */
- public abstract String getInputClassName();
-
- public abstract static class Builder<T extends Builder<T>> implements BaseConfigurer<T> {
-
- /**
- * Enable compression for the specific Input / Output / Edge
- *
- * @param compressionCodec the codec to be used. null implies using the default
- * @return instance of the current builder
- */
- public T enableCompression(String compressionCodec) {
- return (T) this;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
new file mode 100644
index 0000000..35d023f
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.conf;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+abstract class HadoopKeyValuesBasedBaseEdgeConfigurer {
+
+ /**
+ * Get the payload for the configured Output
+ * @return output configuration as a byte array
+ */
+ public abstract byte[] getOutputPayload();
+
+ /**
+ * Get the output class name
+ * @return the output class name
+ */
+ public abstract String getOutputClassName();
+
+ /**
+ * Get the payload for the configured Input
+ * @return input configuration as a byte array
+ */
+ public abstract byte[] getInputPayload();
+
+ /**
+ * Get the input class name
+ * @return the input class name
+ */
+ public abstract String getInputClassName();
+
+ public abstract static class Builder<T extends Builder<T>> implements BaseConfigurer<T> {
+
+ /**
+ * Enable compression for the specific Input / Output / Edge
+ *
+ * @param compressionCodec the codec to be used. null implies using the default
+ * @return instance of the current builder
+ */
+ public T enableCompression(String compressionCodec) {
+ return (T) this;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java
index 97712a3..fed20ab 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java
@@ -75,7 +75,7 @@ public class OnFileSortedOutputConfiguration {
@SuppressWarnings("rawtypes")
@InterfaceAudience.Public
@InterfaceStability.Evolving
- public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseConf.Builder> implements
+ public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
SpecificConfigurer<SpecificBuilder> {
private final E edgeBuilder;
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java
index 17bb3b8..e8e8be1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java
@@ -48,7 +48,7 @@ public class OnFileUnorderedKVOutputConfiguration {
@InterfaceAudience.Public
@InterfaceStability.Evolving
- public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseConf.Builder> implements
+ public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
SpecificConfigurer<SpecificBuilder> {
private final E edgeBuilder;
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java
index 2967501..95a3a3c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java
@@ -56,7 +56,7 @@ public class OnFileUnorderedPartitionedKVOutputConfiguration {
@SuppressWarnings("rawtypes")
@InterfaceAudience.Public
@InterfaceStability.Evolving
- public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseConf.Builder> implements
+ public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
SpecificConfigurer<SpecificBuilder> {
private final E edgeBuilder;
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
index 4da5060..f54f183 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
@@ -36,7 +36,7 @@ import org.apache.tez.runtime.library.output.OnFileSortedOutput;
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseConf {
+public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfigurer {
private final OnFileSortedOutputConfiguration outputConf;
private final ShuffledMergedInputConfiguration inputConf;
@@ -118,7 +118,7 @@ public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBase
@InterfaceAudience.Public
@InterfaceStability.Evolving
- public static class Builder extends HadoopKeyValuesBasedBaseConf.Builder<Builder> {
+ public static class Builder extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder<Builder> {
private final OnFileSortedOutputConfiguration.Builder outputBuilder =
new OnFileSortedOutputConfiguration.Builder();
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java
index d6abf03..dbdd2dc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java
@@ -119,7 +119,7 @@ public class ShuffledMergedInputConfiguration {
@SuppressWarnings("rawtypes")
@InterfaceAudience.Public
@InterfaceStability.Evolving
- public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseConf.Builder> implements
+ public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
SpecificConfigurer<SpecificBuilder> {
private final E edgeBuilder;
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java
index 7ff8a58..f7501dd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java
@@ -88,7 +88,7 @@ public class ShuffledUnorderedKVInputConfiguration {
@SuppressWarnings("rawtypes")
@InterfaceAudience.Public
@InterfaceStability.Evolving
- public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseConf.Builder> implements
+ public static class SpecificBuilder<E extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder> implements
SpecificConfigurer<SpecificBuilder> {
private final E edgeBuilder;
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
index 3f985f6..d07c68a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
@@ -38,7 +38,7 @@ import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseConf {
+public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfigurer {
private final OnFileUnorderedPartitionedKVOutputConfiguration outputConf;
private final ShuffledUnorderedKVInputConfiguration inputConf;
@@ -123,7 +123,7 @@ public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBa
@InterfaceAudience.Public
@InterfaceStability.Evolving
- public static class Builder extends HadoopKeyValuesBasedBaseConf.Builder<Builder> {
+ public static class Builder extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder<Builder> {
private final OnFileUnorderedPartitionedKVOutputConfiguration.Builder outputBuilder =
new OnFileUnorderedPartitionedKVOutputConfiguration.Builder();
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
index 8adfad9..6ebb259 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java
@@ -39,7 +39,7 @@ import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public class UnorderedUnpartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseConf {
+public class UnorderedUnpartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfigurer {
private final OnFileUnorderedKVOutputConfiguration outputConf;
private final ShuffledUnorderedKVInputConfiguration inputConf;
@@ -140,7 +140,7 @@ public class UnorderedUnpartitionedKVEdgeConfigurer extends HadoopKeyValuesBased
@InterfaceAudience.Public
@InterfaceStability.Evolving
- public static class Builder extends HadoopKeyValuesBasedBaseConf.Builder<Builder> {
+ public static class Builder extends HadoopKeyValuesBasedBaseEdgeConfigurer.Builder<Builder> {
private final OnFileUnorderedKVOutputConfiguration.Builder outputBuilder =
new OnFileUnorderedKVOutputConfiguration.Builder();
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index 0a3c06b..ae43022 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
@@ -181,9 +182,12 @@ public class TestDAGRecovery {
@Test(timeout=120000)
public void testDelayedInit() throws Exception {
DAG dag = SimpleVTestDAG.createDAG("DelayedInitDAG", null);
- dag.getVertex("v1").addDataSource("i1",
- new InputDescriptor(NoOpInput.class.getName()),
- new InputInitializerDescriptor(FailingInputInitializer.class.getName()));
+ dag.getVertex("v1").addDataSource(
+ "i1",
+ new DataSourceDescriptor(
+ new InputDescriptor(NoOpInput.class.getName()),
+ new InputInitializerDescriptor(FailingInputInitializer.class
+ .getName()), null));
runDAGAndVerify(dag, State.SUCCEEDED);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
index ca8f00b..003f615 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.OutputCommitterDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
@@ -180,7 +181,7 @@ public class TestDAGRecovery2 {
.toUserPayload());
OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(
MultiAttemptDAG.FailingOutputCommitter.class.getName());
- dag.getVertex("v3").addDataSink("FailingOutput", od, ocd);
+ dag.getVertex("v3").addDataSink("FailingOutput", new DataSinkDescriptor(od, ocd, null));
runDAGAndVerify(dag, State.FAILED);
}
[2/2] git commit: TEZ-1317. Simplify MRinput/MROutput configuration
(bikas)
Posted by bi...@apache.org.
TEZ-1317. Simplify MRinput/MROutput configuration (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1831fda9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1831fda9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1831fda9
Branch: refs/heads/master
Commit: 1831fda98497101566665d6157ceec02ff2166a4
Parents: 5ae48c6
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Aug 5 13:05:32 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Aug 5 13:05:32 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/client/TezClientUtils.java | 36 ++-
.../main/java/org/apache/tez/dag/api/DAG.java | 29 +-
.../apache/tez/dag/api/DataSinkDescriptor.java | 72 +++++
.../tez/dag/api/DataSourceDescriptor.java | 127 +++++++++
.../java/org/apache/tez/dag/api/Vertex.java | 64 +++--
.../org/apache/tez/dag/api/VertexGroup.java | 7 +-
.../org/apache/tez/dag/api/TestDAGVerify.java | 18 +-
.../tez/dag/app/dag/impl/TestDAGImpl.java | 5 +-
.../tez/dag/history/utils/TestDAGUtils.java | 10 +-
.../mapreduce/examples/FilterLinesByWord.java | 15 +-
.../examples/FilterLinesByWordOneToOne.java | 21 +-
.../mapreduce/examples/IntersectDataGen.java | 46 +---
.../mapreduce/examples/IntersectExample.java | 67 +----
.../mapreduce/examples/IntersectValidate.java | 46 +---
.../tez/mapreduce/examples/MRRSleepJob.java | 56 ++--
.../tez/mapreduce/examples/UnionExample.java | 60 ++---
.../tez/mapreduce/examples/WordCount.java | 33 +--
.../apache/tez/mapreduce/hadoop/MRHelpers.java | 7 +-
.../org/apache/tez/mapreduce/input/MRInput.java | 266 +++++++++++++++----
.../tez/mapreduce/input/MRInputLegacy.java | 26 ++
.../apache/tez/mapreduce/output/MROutput.java | 154 +++++++++--
.../conf/HadoopKeyValuesBasedBaseConf.java | 65 -----
.../HadoopKeyValuesBasedBaseEdgeConfigurer.java | 64 +++++
.../conf/OnFileSortedOutputConfiguration.java | 2 +-
.../OnFileUnorderedKVOutputConfiguration.java | 2 +-
...orderedPartitionedKVOutputConfiguration.java | 2 +-
.../OrderedPartitionedKVEdgeConfigurer.java | 4 +-
.../conf/ShuffledMergedInputConfiguration.java | 2 +-
.../ShuffledUnorderedKVInputConfiguration.java | 2 +-
.../UnorderedPartitionedKVEdgeConfigurer.java | 4 +-
.../UnorderedUnpartitionedKVEdgeConfigurer.java | 4 +-
.../org/apache/tez/test/TestDAGRecovery.java | 10 +-
.../org/apache/tez/test/TestDAGRecovery2.java | 3 +-
34 files changed, 877 insertions(+), 453 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 072fe55..5e4f2d8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -45,6 +45,7 @@ INCOMPATIBLE CHANGES
Processor/Input/Output classes
TEZ-1351. MROutput needs a flush method to ensure data is materialized for
FileOutputCommitter
+ TEZ-1317. Simplify MRinput/MROutput configuration
Release 0.4.0-incubating: 2014-04-05
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index dda30c2..a82393f 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -271,6 +271,29 @@ public class TezClientUtils {
}
return fs;
}
+
+ /**
+ * Populate {@link Credentials} for the URI's to access them from their {@link FileSystem}s
+ * @param uris URIs that need to be accessed
+ * @param credentials Credentials object into which to add the credentials
+ * @param conf Configuration to access the FileSystem
+ * @throws IOException
+ */
+ public static void addFileSystemCredentialsFromURIs(Collection<URI> uris, Credentials credentials,
+ Configuration conf) throws IOException {
+ // Obtain Credentials for any paths that the user may have configured.
+ if (uris != null && !uris.isEmpty()) {
+ Iterator<Path> pathIter = Iterators.transform(uris.iterator(), new Function<URI, Path>() {
+ @Override
+ public Path apply(URI input) {
+ return new Path(input);
+ }
+ });
+
+ Path[] paths = Iterators.toArray(pathIter, Path.class);
+ TokenCache.obtainTokensForFileSystems(credentials, paths, conf);
+ }
+ }
/**
* Obtains tokens for the DAG based on the list of URIs setup in the DAG. The
@@ -303,18 +326,7 @@ public class TezClientUtils {
// Add additional credentials based on any URIs that the user may have specified.
// Obtain Credentials for any paths that the user may have configured.
- Collection<URI> uris = dag.getURIsForCredentials();
- if (uris != null && !uris.isEmpty()) {
- Iterator<Path> pathIter = Iterators.transform(uris.iterator(), new Function<URI, Path>() {
- @Override
- public Path apply(URI input) {
- return new Path(input);
- }
- });
-
- Path[] paths = Iterators.toArray(pathIter, Path.class);
- TokenCache.obtainTokensForFileSystems(dagCredentials, paths, conf);
- }
+ addFileSystemCredentialsFromURIs(dag.getURIsForCredentials(), dagCredentials, conf);
// Obtain Credentials for the local resources configured on the DAG
try {
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 15d23e3..e754990 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -72,7 +72,7 @@ public class DAG {
final Set<Edge> edges = Sets.newHashSet();
final String name;
final Collection<URI> urisForCredentials = new HashSet<URI>();
- Credentials credentials;
+ Credentials credentials = new Credentials();
Set<VertexGroup> vertexGroups = Sets.newHashSet();
Set<GroupInputEdge> groupInputEdges = Sets.newHashSet();
@@ -134,8 +134,9 @@ public class DAG {
* need to be obtained so that the job can run. An incremental list of URIs
* can be provided by making multiple calls to the method.
*
- * Currently, credentials can only be fetched for HDFS and other
- * {@link org.apache.hadoop.fs.FileSystem} implementations.
+ * Currently, @{link credentials} can only be fetched for HDFS and other
+ * {@link org.apache.hadoop.fs.FileSystem} implementations that support
+ * credentials.
*
* @param uris
* a list of {@link URI}s
@@ -530,6 +531,28 @@ public class DAG {
}
for (Vertex vertex : vertices.values()) {
+ // infer credentials and parallelism from data source
+ List<DataSourceDescriptor> dataSources = vertex.getDataSources();
+ for (DataSourceDescriptor dataSource : dataSources) {
+ if (dataSource.getCredentials() != null) {
+ credentials.addAll(dataSource.getCredentials());
+ }
+ }
+ if (dataSources.size() == 1) {
+ DataSourceDescriptor dataSource = dataSources.get(0);
+ if (vertex.getParallelism() == -1 && dataSource.getNumberOfShards() > -1) {
+ vertex.setParallelism(dataSource.getNumberOfShards());
+ }
+ if (vertex.getLocationHint() == null && dataSource.getLocationHint() != null) {
+ vertex.setLocationHint(dataSource.getLocationHint());
+ }
+ }
+ for (DataSinkDescriptor dataSink : vertex.getDataSinks()) {
+ if (dataSink.getCredentials() != null) {
+ credentials.addAll(dataSink.getCredentials());
+ }
+ }
+
VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
vertexBuilder.setName(vertex.getName());
vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until TEZ-46.
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
new file mode 100644
index 0000000..b5e4e8f
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DataSinkDescriptor.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.security.Credentials;
+
+/**
+ * Defines the output and output committer for a data sink
+ *
+ */
+public class DataSinkDescriptor {
+ private final OutputDescriptor outputDescriptor;
+ private final OutputCommitterDescriptor committerDescriptor;
+
+ private final Credentials credentials;
+
+ /**
+ * Create a {@link DataSinkDescriptor}
+ * @param outputDescriptor
+ * An {@link OutputDescriptor} for the output
+ * @param outputCommitterDescriptor
+ * Specify a committer to be used for the output. Can be null. After all
+ * tasks in the vertex (or in the DAG) have completed, the committer
+ * (if specified) is invoked to commit the outputs. Commit is a data
+ * sink specific operation that usually determines the visibility of
+ * the output to external observers. E.g. moving output files from
+ * temporary dirs to the real output dir. When there are multiple
+ * executions of a task, the commit process also helps decide which
+ * execution will be included in the final output. Users should
+ * consider whether their application or data sink need a commit
+ * operation.
+ * @param credentials Credentials needs to access the data sink
+ */
+ public DataSinkDescriptor(OutputDescriptor outputDescriptor,
+ @Nullable OutputCommitterDescriptor committerDescriptor,
+ @Nullable Credentials credentials) {
+ this.outputDescriptor = outputDescriptor;
+ this.committerDescriptor = committerDescriptor;
+ this.credentials = credentials;
+ }
+
+ public OutputDescriptor getOutputDescriptor() {
+ return outputDescriptor;
+ }
+
+ public @Nullable OutputCommitterDescriptor getOutputCommitterDescriptor() {
+ return committerDescriptor;
+ }
+
+ public @Nullable Credentials getCredentials() {
+ return credentials;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
new file mode 100644
index 0000000..51d85ae
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DataSourceDescriptor.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.security.Credentials;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+
+/**
+ * Defines the input and input initializer for a data source
+ *
+ */
+public class DataSourceDescriptor {
+ private final InputDescriptor inputDescriptor;
+ private final InputInitializerDescriptor initializerDescriptor;
+
+ private final Credentials credentials;
+ private final int numShards;
+ private final VertexLocationHint locationHint;
+
+ /**
+ * Create a {@link DataSourceDescriptor} when the data shard calculation
+ * happens in the App Master at runtime
+ * @param inputDescriptor
+ * An {@link InputDescriptor} for the Input
+ * @param credentials Credentials needed to access the data
+ * @param inputInitializerDescriptor
+ * An initializer for this Input which may run within the AM. This
+ * can be used to set the parallelism for this vertex and generate
+ * {@link RootInputDataInformationEvent}s for the actual Input.</p>
+ * If this is not specified, the parallelism must be set for the
+ * vertex. In addition, the Input should know how to access data for
+ * each of it's tasks. </p> If a {@link TezRootInputInitializer} is
+ * meant to determine the parallelism of the vertex, the initial
+ * vertex parallelism should be set to -1. Can be null.
+ */
+ public DataSourceDescriptor(InputDescriptor inputDescriptor,
+ @Nullable InputInitializerDescriptor initializerDescriptor,
+ @Nullable Credentials credentials) {
+ this(inputDescriptor, initializerDescriptor, -1, credentials, null);
+ }
+
+ /**
+ * Create a {@link DataSourceDescriptor} when the data shard calculation
+ * happens in the client at compile time
+ * @param inputDescriptor
+ * An {@link InputDescriptor} for the Input
+ * @param inputInitializerDescriptor
+ * An initializer for this Input which may run within the AM. This
+ * can be used to set the parallelism for this vertex and generate
+ * {@link RootInputDataInformationEvent}s for the actual Input.</p>
+ * If this is not specified, the parallelism must be set for the
+ * vertex. In addition, the Input should know how to access data for
+ * each of it's tasks. </p> If a {@link TezRootInputInitializer} is
+ * meant to determine the parallelism of the vertex, the initial
+ * vertex parallelism should be set to -1. Can be null.
+ * @param numShards Number of shards of data
+ * @param credentials Credentials needed to access the data
+ * @param locationHint Location hints for the vertex tasks
+ */
+ public DataSourceDescriptor(InputDescriptor inputDescriptor,
+ @Nullable InputInitializerDescriptor initializerDescriptor, int numShards,
+ @Nullable Credentials credentials, @Nullable VertexLocationHint locationHint) {
+ this.inputDescriptor = inputDescriptor;
+ this.initializerDescriptor = initializerDescriptor;
+ this.numShards = numShards;
+ this.credentials = credentials;
+ this.locationHint = locationHint;
+ }
+
+ public InputDescriptor getInputDescriptor() {
+ return inputDescriptor;
+ }
+
+ public @Nullable InputInitializerDescriptor getInputInitializerDescriptor() {
+ return initializerDescriptor;
+ }
+
+ /**
+ * Number of shards for this data source. If a vertex has only one
+ * data source this the number of tasks in the vertex should be set to
+ * the number of shards
+ * Returns -1 when this is determined at runtime in the AM.
+ * @return number of tasks
+ */
+ public int getNumberOfShards() {
+ return numShards;
+ }
+
+ /**
+ * Returns any credentials needed to access this data source.
+ * Is null when this calculation happens on the AppMaster (default)
+ * @return credentials.
+ */
+ public @Nullable Credentials getCredentials() {
+ return credentials;
+ }
+
+ /**
+ * Get the location hints for the tasks in the vertex for this data source.
+ * Is null when shard calculation happens on the AppMaster (default)
+ * @return List of {@link TaskLocationHint}
+ */
+ public @Nullable VertexLocationHint getLocationHint() {
+ return locationHint;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 25a3990..5f67ad1 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -24,17 +24,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
-
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.dag.api.VertexGroup.GroupInfo;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
import org.apache.tez.runtime.api.LogicalIOProcessor;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
-import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
public class Vertex {
@@ -58,6 +55,8 @@ public class Vertex {
private final List<Edge> inputEdges = new ArrayList<Edge>();
private final List<Edge> outputEdges = new ArrayList<Edge>();
private final Map<String, GroupInfo> groupInputs = Maps.newHashMap();
+ private final List<DataSourceDescriptor> dataSources = Lists.newLinkedList();
+ private final List<DataSinkDescriptor> dataSinks = Lists.newLinkedList();
private String taskLaunchCmdOpts = "";
@@ -232,24 +231,16 @@ public class Vertex {
* @param inputName
* the name of the input. This will be used when accessing the input
* in the {@link LogicalIOProcessor}
- * @param inputDescriptor
- * the inputDescriptor for this input
- * @param inputInitializerDescriptor
- * An initializer for this Input which may run within the AM. This
- * can be used to set the parallelism for this vertex and generate
- * {@link RootInputDataInformationEvent}s for the actual Input.</p>
- * If this is not specified, the parallelism must be set for the
- * vertex. In addition, the Input should know how to access data for
- * each of it's tasks. </p> If a {@link TezRootInputInitializer} is
- * meant to determine the parallelism of the vertex, the initial
- * vertex parallelism should be set to -1. Can be null.
+ * @param dataSourceDescriptor
+ * the @{link DataSourceDescriptor} for this input.
* @return this Vertex
*/
- public Vertex addDataSource(String inputName, InputDescriptor inputDescriptor,
- @Nullable InputInitializerDescriptor inputInitializerDescriptor) {
+ public Vertex addDataSource(String inputName, DataSourceDescriptor dataSourceDescriptor) {
additionalInputs
.add(new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>(
- inputName, inputDescriptor, inputInitializerDescriptor));
+ inputName, dataSourceDescriptor.getInputDescriptor(),
+ dataSourceDescriptor.getInputInitializerDescriptor()));
+ this.dataSources.add(dataSourceDescriptor);
return this;
}
@@ -267,25 +258,16 @@ public class Vertex {
* @param outputName
* the name of the output. This will be used when accessing the
* output in the {@link LogicalIOProcessor}
- * @param outputDescriptor
- * @param outputCommitterDescriptor
- * Specify committer to be used for the output Can be null. After all
- * tasks in the vertex (or in the DAG) have completed, the committer
- * (if specified) is invoked to commit the outputs. Commit is a data
- * sink specific operation that usually determines the visibility of
- * the output to external observers. E.g. moving output files from
- * temporary dirs to the real output dir. When there are multiple
- * executions of a task, the commit process also helps decide which
- * execution will be included in the final output. Users should
- * consider whether their application or data sink need a commit
- * operation.
+ * @param dataSinkDescriptor
+ * the {@link DataSinkDescriptor} for this output
* @return this Vertex
*/
- public Vertex addDataSink(String outputName, OutputDescriptor outputDescriptor,
- @Nullable OutputCommitterDescriptor outputCommitterDescriptor) {
+ public Vertex addDataSink(String outputName, DataSinkDescriptor dataSinkDescriptor) {
additionalOutputs
.add(new RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>(
- outputName, outputDescriptor, outputCommitterDescriptor));
+ outputName, dataSinkDescriptor.getOutputDescriptor(),
+ dataSinkDescriptor.getOutputCommitterDescriptor()));
+ this.dataSinks.add(dataSinkDescriptor);
return this;
}
@@ -347,13 +329,29 @@ public class Vertex {
outputEdges.add(edge);
}
+ /**
+ * Get the input vertices for this vertex
+ * @return List of input vertices
+ */
public List<Vertex> getInputVertices() {
return Collections.unmodifiableList(inputVertices);
}
+ /**
+ * Get the output vertices for this vertex
+ * @return List of output vertices
+ */
public List<Vertex> getOutputVertices() {
return Collections.unmodifiableList(outputVertices);
}
+
+ List<DataSourceDescriptor> getDataSources() {
+ return dataSources;
+ }
+
+ List<DataSinkDescriptor> getDataSinks() {
+ return dataSinks;
+ }
List<Edge> getInputEdges() {
return inputEdges;
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
index 991350b..894d649 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexGroup.java
@@ -22,8 +22,6 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import javax.annotation.Nullable;
-
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -90,11 +88,10 @@ public class VertexGroup {
* Add an common data sink to the group of vertices.
* Refer to {@link Vertex#addDataSink(String, OutputDescriptor, OutputCommitterDescriptor)}
*/
- public VertexGroup addDataSink(String outputName, OutputDescriptor outputDescriptor,
- @Nullable OutputCommitterDescriptor committerDescriptor) {
+ public VertexGroup addDataSink(String outputName, DataSinkDescriptor dataSinkDescriptor) {
RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> leafOutput =
new RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>(outputName,
- outputDescriptor, committerDescriptor);
+ dataSinkDescriptor.getOutputDescriptor(), dataSinkDescriptor.getOutputCommitterDescriptor());
this.groupInfo.outputs.add(outputName);
// also add output to its members
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 91c04fc..b03a23b 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -482,7 +482,7 @@ public class TestDAGVerify {
new ProcessorDescriptor("MapProcessor"),
dummyTaskCount, dummyTaskResource);
- v2.addDataSource("v1", new InputDescriptor(), null);
+ v2.addDataSource("v1", new DataSourceDescriptor(null, null, null));
Edge e1 = new Edge(v1, v2,
new EdgeProperty(DataMovementType.SCATTER_GATHER,
@@ -506,7 +506,7 @@ public class TestDAGVerify {
new ProcessorDescriptor("MapProcessor"),
dummyTaskCount, dummyTaskResource);
- v1.addDataSink("v2", new OutputDescriptor(), null);
+ v1.addDataSink("v2", new DataSinkDescriptor(null, null, null));
Edge e1 = new Edge(v1, v2,
new EdgeProperty(DataMovementType.SCATTER_GATHER,
@@ -530,7 +530,7 @@ public class TestDAGVerify {
new ProcessorDescriptor("MapProcessor"),
dummyTaskCount, dummyTaskResource);
- v1.addDataSink("v2", new OutputDescriptor(), null);
+ v1.addDataSink("v2", new DataSinkDescriptor(null, null, null));
DAG dag = new DAG("testDag");
dag.addVertex(v1);
@@ -547,7 +547,7 @@ public class TestDAGVerify {
new ProcessorDescriptor("MapProcessor"),
dummyTaskCount, dummyTaskResource);
- v1.addDataSource("v2", new InputDescriptor(), null);
+ v1.addDataSource("v2", new DataSourceDescriptor(null, null, null));
DAG dag = new DAG("testDag");
dag.addVertex(v1);
@@ -606,7 +606,7 @@ public class TestDAGVerify {
DAG dag = new DAG("testDag");
VertexGroup uv12 = dag.createVertexGroup("uv12", v1, v2);
OutputDescriptor outDesc = new OutputDescriptor();
- uv12.addDataSink("uvOut", outDesc, null);
+ uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, null, null));
GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
new EdgeProperty(DataMovementType.SCATTER_GATHER,
@@ -663,7 +663,7 @@ public class TestDAGVerify {
String groupName1 = "uv12";
VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
OutputDescriptor outDesc = new OutputDescriptor();
- uv12.addDataSink("uvOut", outDesc, null);
+ uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, null, null));
String groupName2 = "uv23";
VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3);
@@ -745,7 +745,7 @@ public class TestDAGVerify {
String groupName1 = "uv12";
VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
OutputDescriptor outDesc = new OutputDescriptor();
- uv12.addDataSink("uvOut", outDesc, null);
+ uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, null, null));
String groupName2 = "uv23";
VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3);
@@ -880,8 +880,8 @@ public class TestDAGVerify {
.getBytes());
InputDescriptor inputDescriptor2 = new InputDescriptor("input2").setUserPayload("inputBytes"
.getBytes());
- v1.addDataSource("input1", inputDescriptor1, null);
- v1.addDataSource("input2", inputDescriptor2, null);
+ v1.addDataSource("input1", new DataSourceDescriptor(inputDescriptor1, null, null));
+ v1.addDataSource("input2", new DataSourceDescriptor(inputDescriptor2, null, null));
dag.addVertex(v1);
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index c7fb4b5..6cb5d56 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.EdgeProperty;
@@ -361,8 +362,8 @@ public class TestDAGImpl {
TotalCountingOutputCommitter.class.getName());
org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
OutputDescriptor outDesc = new OutputDescriptor("output.class");
- uv12.addDataSink("uvOut", outDesc, ocd);
- v3.addDataSink("uvOut", outDesc, ocd);
+ uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
+ v3.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
new EdgeProperty(DataMovementType.SCATTER_GATHER,
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
index 5f0f1c9..7ce0de3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -31,6 +31,8 @@ import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputCommitterDescriptor;
@@ -54,8 +56,8 @@ public class TestDAGUtils {
org.apache.tez.dag.api.Vertex v1 = new org.apache.tez.dag.api.Vertex("vertex1",
new ProcessorDescriptor("Processor").setHistoryText("vertex1 Processor HistoryText"),
dummyTaskCount, dummyTaskResource);
- v1.addDataSource("input1", new InputDescriptor("input.class").setHistoryText("input HistoryText"),
- null);
+ v1.addDataSource("input1", new DataSourceDescriptor(new InputDescriptor(
+ "input.class").setHistoryText("input HistoryText"), null, null));
org.apache.tez.dag.api.Vertex v2 = new org.apache.tez.dag.api.Vertex("vertex2",
new ProcessorDescriptor("Processor").setHistoryText("vertex2 Processor HistoryText"),
dummyTaskCount, dummyTaskResource);
@@ -69,8 +71,8 @@ public class TestDAGUtils {
OutputDescriptor outDesc = new OutputDescriptor("output.class")
.setHistoryText("uvOut HistoryText");
OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(OutputCommitter.class.getName());
- uv12.addDataSink("uvOut", outDesc, ocd);
- v3.addDataSink("uvOut", outDesc, ocd);
+ uv12.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
+ v3.addDataSink("uvOut", new DataSinkDescriptor(outDesc, ocd, null));
GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
new EdgeProperty(DataMovementType.SCATTER_GATHER,
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 465fec2..8fe8e88 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -54,6 +54,8 @@ import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
@@ -200,10 +202,13 @@ public class FilterLinesByWord extends Configured implements Tool {
// Configure the Input for stage1
Class<? extends TezRootInputInitializer> initializerClazz = generateSplitsInClient ? null
: MRInputAMSplitGenerator.class;
- stage1Vertex.addDataSource("MRInput",
- new InputDescriptor(MRInputLegacy.class.getName())
- .setUserPayload(MRHelpers.createMRInputPayload(stage1Payload, null)),
- (initializerClazz==null ? null : new InputInitializerDescriptor(initializerClazz.getName())));
+ stage1Vertex.addDataSource(
+ "MRInput",
+ new DataSourceDescriptor(new InputDescriptor(MRInputLegacy.class
+ .getName()).setUserPayload(MRHelpers.createMRInputPayload(
+ stage1Payload, null)),
+ (initializerClazz == null ? null
+ : new InputInitializerDescriptor(initializerClazz.getName())), null));
// Setup stage2 Vertex
Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
@@ -216,7 +221,7 @@ public class FilterLinesByWord extends Configured implements Tool {
OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
.setUserPayload(MRHelpers.createUserPayloadFromConf(stage2Conf));
OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName());
- stage2Vertex.addDataSink("MROutput", od, ocd);
+ stage2Vertex.addDataSink("MROutput", new DataSinkDescriptor(od, ocd, null));
UnorderedUnpartitionedKVEdgeConfigurer edgeConf = UnorderedUnpartitionedKVEdgeConfigurer
.newBuilder(Text.class.getName(), TextLongPair.class.getName()).build();
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index 83be555..822061d 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
@@ -186,10 +188,12 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
// Configure the Input for stage1
Class<? extends TezRootInputInitializer> initializerClazz = generateSplitsInClient ? null
: MRInputAMSplitGenerator.class;
- stage1Vertex.addDataSource("MRInput",
- new InputDescriptor(MRInputLegacy.class.getName())
- .setUserPayload(MRHelpers.createMRInputPayload(stage1Payload, null)),
- (initializerClazz==null ? null : new InputInitializerDescriptor(initializerClazz.getName())));
+ stage1Vertex.addDataSource(
+ "MRInput",
+ new DataSourceDescriptor(new InputDescriptor(MRInputLegacy.class
+ .getName()).setUserPayload(MRHelpers.createMRInputPayload(
+ stage1Payload, null)), (initializerClazz == null ? null
+ : new InputInitializerDescriptor(initializerClazz.getName())), null));
// Setup stage2 Vertex
Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
@@ -199,10 +203,11 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
stage2Vertex.setTaskLocalFiles(commonLocalResources);
// Configure the Output for stage2
- stage2Vertex.addDataSink("MROutput",
- new OutputDescriptor(MROutput.class.getName()).setUserPayload(MRHelpers
- .createUserPayloadFromConf(stage2Conf)),
- new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
+ stage2Vertex.addDataSink(
+ "MROutput",
+ new DataSinkDescriptor(new OutputDescriptor(MROutput.class.getName())
+ .setUserPayload(MRHelpers.createUserPayloadFromConf(stage2Conf)),
+ new OutputCommitterDescriptor(MROutputCommitter.class.getName()), null));
UnorderedUnpartitionedKVEdgeConfigurer edgeConf = UnorderedUnpartitionedKVEdgeConfigurer
.newBuilder(Text.class.getName(), TextLongPair.class.getName()).build();
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
index d5b0eb9..d83aa34 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
@@ -23,9 +23,6 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.net.URI;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
@@ -36,7 +33,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
@@ -44,15 +40,12 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.OutputCommitterDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.committer.MROutputCommitter;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
@@ -185,7 +178,6 @@ public class IntersectDataGen extends Configured implements Tool {
DAG dag = createDag(tezConf, largeOutPath, smallOutPath, expectedOutputPath, numTasks,
largeOutSize, smallOutSize);
- setupURIsForCredentials(dag, largeOutPath, smallOutPath, expectedOutputPath);
tezSession.waitTillReady();
DAGClient dagClient = tezSession.submitDAG(dag);
@@ -207,22 +199,18 @@ public class IntersectDataGen extends Configured implements Tool {
DAG dag = new DAG("IntersectDataGen");
- byte[] streamOutputPayload = createPayloadForOutput(largeOutPath, tezConf);
- byte[] hashOutputPayload = createPayloadForOutput(smallOutPath, tezConf);
- byte[] expectedOutputPayload = createPayloadForOutput(expectedOutputPath, tezConf);
-
Vertex genDataVertex = new Vertex("datagen", new ProcessorDescriptor(
GenDataProcessor.class.getName()).setUserPayload(GenDataProcessor.createConfiguration(
largeOutSizePerTask, smallOutSizePerTask)), numTasks, MRHelpers.getMapResource(tezConf));
- genDataVertex.addDataSink(STREAM_OUTPUT_NAME,
- new OutputDescriptor(MROutput.class.getName()).setUserPayload(streamOutputPayload),
- new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
- genDataVertex.addDataSink(HASH_OUTPUT_NAME,
- new OutputDescriptor(MROutput.class.getName()).setUserPayload(hashOutputPayload),
- new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
- genDataVertex.addDataSink(EXPECTED_OUTPUT_NAME,
- new OutputDescriptor(MROutput.class.getName()).setUserPayload(expectedOutputPayload),
- new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
+ genDataVertex.addDataSink(STREAM_OUTPUT_NAME,
+ MROutput.createConfigurer(new Configuration(tezConf),
+ TextOutputFormat.class, largeOutPath.toUri().toString()).create());
+ genDataVertex.addDataSink(HASH_OUTPUT_NAME,
+ MROutput.createConfigurer(new Configuration(tezConf),
+ TextOutputFormat.class, smallOutPath.toUri().toString()).create());
+ genDataVertex.addDataSink(EXPECTED_OUTPUT_NAME,
+ MROutput.createConfigurer(new Configuration(tezConf),
+ TextOutputFormat.class, expectedOutputPath.toUri().toString()).create());
dag.addVertex(genDataVertex);
@@ -351,16 +339,6 @@ public class IntersectDataGen extends Configured implements Tool {
}
- private void setupURIsForCredentials(DAG dag, Path... paths) throws IOException {
- List<URI> uris = new LinkedList<URI>();
- for (Path path : paths) {
- FileSystem fs = path.getFileSystem(getConf());
- Path qPath = fs.makeQualified(path);
- uris.add(qPath.toUri());
- }
- dag.addURIsForCredentials(uris);
- }
-
private int checkOutputDirectory(FileSystem fs, Path path) throws IOException {
if (fs.exists(path)) {
System.err.println("Output directory: " + path + " already exists");
@@ -369,10 +347,4 @@ public class IntersectDataGen extends Configured implements Tool {
return 0;
}
- private byte[] createPayloadForOutput(Path outputPath, Configuration srcConf) throws IOException {
- Configuration conf = new Configuration(srcConf);
- conf.set(FileOutputFormat.OUTDIR, outputPath.toUri().toString());
- byte[] payload = MROutput.createUserPayload(conf, TextOutputFormat.class.getName(), true);
- return payload;
- }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
index ef7643a..9a28014 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectExample.java
@@ -19,10 +19,8 @@
package org.apache.tez.mapreduce.examples;
import java.io.IOException;
-import java.net.URI;
import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
+
import java.util.Set;
import org.apache.commons.logging.Log;
@@ -33,9 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
@@ -44,18 +40,12 @@ import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
-import org.apache.tez.dag.api.OutputCommitterDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.committer.MROutputCommitter;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
@@ -168,7 +158,6 @@ public class IntersectExample extends Configured implements Tool {
}
DAG dag = createDag(tezConf, streamInputPath, hashInputPath, outputPath, numPartitions);
- setupURIsForCredentials(dag, streamInputPath, hashInputPath, outputPath);
tezSession.waitTillReady();
DAGClient dagClient = tezSession.submitDAG(dag);
@@ -185,18 +174,6 @@ public class IntersectExample extends Configured implements Tool {
int numPartitions) throws IOException {
DAG dag = new DAG("IntersectExample");
- // Configuration for src1
- Configuration streamInputConf = new Configuration(tezConf);
- streamInputConf.set(FileInputFormat.INPUT_DIR, streamPath.toUri().toString());
- byte[] streamInputPayload = MRInput.createUserPayload(streamInputConf,
- TextInputFormat.class.getName(), true, false);
-
- // Configuration for src2
- Configuration hashInputConf = new Configuration(tezConf);
- hashInputConf.set(FileInputFormat.INPUT_DIR, hashPath.toUri().toString());
- byte[] hashInputPayload = MRInput.createUserPayload(hashInputConf,
- TextInputFormat.class.getName(), true, false);
-
// Configuration for intermediate output - shared by Vertex1 and Vertex2
// This should only be setting selective keys from the underlying conf. Fix after there's a
// better mechanism to configure the IOs.
@@ -206,32 +183,26 @@ public class IntersectExample extends Configured implements Tool {
.newBuilder(Text.class.getName(), NullWritable.class.getName(),
HashPartitioner.class.getName(), null).build();
- Configuration finalOutputConf = new Configuration(tezConf);
- finalOutputConf.set(FileOutputFormat.OUTDIR, outPath.toUri().toString());
- byte[] finalOutputPayload = MROutput.createUserPayload(finalOutputConf,
- TextOutputFormat.class.getName(), true);
-
// Change the way resources are setup - no MRHelpers
- Vertex streamFileVertex = new Vertex("partitioner1",
- new ProcessorDescriptor(ForwardingProcessor.class.getName()), -1,
- MRHelpers.getMapResource(tezConf)).addDataSource("streamfile",
- new InputDescriptor(MRInput.class.getName())
- .setUserPayload(streamInputPayload),
- new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
+ Vertex streamFileVertex = new Vertex("partitioner1", new ProcessorDescriptor(
+ ForwardingProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)).addDataSource(
+ "streamfile",
+ MRInput
+ .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
+ streamPath.toUri().toString()).groupSplitsInAM(false).create());
Vertex hashFileVertex = new Vertex("partitioner2", new ProcessorDescriptor(
- ForwardingProcessor.class.getName()), -1,
- MRHelpers.getMapResource(tezConf)).addDataSource("hashfile",
- new InputDescriptor(MRInput.class.getName())
- .setUserPayload(hashInputPayload),
- new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
+ ForwardingProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf)).addDataSource(
+ "hashfile",
+ MRInput
+ .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
+ hashPath.toUri().toString()).groupSplitsInAM(false).create());
Vertex intersectVertex = new Vertex("intersect", new ProcessorDescriptor(
IntersectProcessor.class.getName()), numPartitions,
MRHelpers.getReduceResource(tezConf)).addDataSink("finalOutput",
- new OutputDescriptor(MROutput.class.getName())
- .setUserPayload(finalOutputPayload),
- new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
+ MROutput.createConfigurer(new Configuration(tezConf),
+ TextOutputFormat.class, outPath.toUri().toString()).create());
Edge e1 = new Edge(streamFileVertex, intersectVertex, edgeConf.createDefaultEdgeProperty());
@@ -242,16 +213,6 @@ public class IntersectExample extends Configured implements Tool {
return dag;
}
- private void setupURIsForCredentials(DAG dag, Path... paths) throws IOException {
- List<URI> uris = new LinkedList<URI>();
- for (Path path : paths) {
- FileSystem fs = path.getFileSystem(getConf());
- Path qPath = fs.makeQualified(path);
- uris.add(qPath.toUri());
- }
- dag.addURIsForCredentials(uris);
- }
-
// private void obtainTokens(Credentials credentials, Path... paths) throws IOException {
// TokenCache.obtainTokensForNamenodes(credentials, paths, getConf());
// }
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
index b0a5c6c..ac671a4 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
@@ -19,19 +19,14 @@
package org.apache.tez.mapreduce.examples;
import java.io.IOException;
-import java.net.URI;
-import java.util.LinkedList;
-import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
@@ -41,8 +36,6 @@ import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
@@ -50,7 +43,6 @@ import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.examples.IntersectExample.ForwardingProcessor;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.input.MRInput;
@@ -160,7 +152,6 @@ public class IntersectValidate extends Configured implements Tool {
Path rhsPath = new Path(rhsDir);
DAG dag = createDag(tezConf, lhsPath, rhsPath, numPartitions);
- setupURIsForCredentials(dag, lhsPath, rhsPath);
tezSession.waitTillReady();
DAGClient dagClient = tezSession.submitDAG(dag);
@@ -191,18 +182,6 @@ public class IntersectValidate extends Configured implements Tool {
throws IOException {
DAG dag = new DAG("IntersectValidate");
- // Configuration for src1
- Configuration lhsInputConf = new Configuration(tezConf);
- lhsInputConf.set(FileInputFormat.INPUT_DIR, lhs.toUri().toString());
- byte[] streamInputPayload = MRInput.createUserPayload(lhsInputConf,
- TextInputFormat.class.getName(), true, false);
-
- // Configuration for src2
- Configuration rhsInputConf = new Configuration(tezConf);
- rhsInputConf.set(FileInputFormat.INPUT_DIR, rhs.toUri().toString());
- byte[] hashInputPayload = MRInput.createUserPayload(rhsInputConf,
- TextInputFormat.class.getName(), true, false);
-
// Configuration for intermediate output - shared by Vertex1 and Vertex2
// This should only be setting selective keys from the underlying conf. Fix after there's a
// better mechanism to configure the IOs.
@@ -213,15 +192,19 @@ public class IntersectValidate extends Configured implements Tool {
// Change the way resources are setup - no MRHelpers
Vertex lhsVertex = new Vertex(LHS_INPUT_NAME, new ProcessorDescriptor(
ForwardingProcessor.class.getName()), -1,
- MRHelpers.getMapResource(tezConf)).addDataSource("lhs", new InputDescriptor(
- MRInput.class.getName()).setUserPayload(streamInputPayload),
- new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
+ MRHelpers.getMapResource(tezConf)).addDataSource(
+ "lhs",
+ MRInput
+ .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
+ lhs.toUri().toString()).groupSplitsInAM(false).create());
Vertex rhsVertex = new Vertex(RHS_INPUT_NAME, new ProcessorDescriptor(
ForwardingProcessor.class.getName()), -1,
- MRHelpers.getMapResource(tezConf)).addDataSource("rhs", new InputDescriptor(
- MRInput.class.getName()).setUserPayload(hashInputPayload),
- new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
+ MRHelpers.getMapResource(tezConf)).addDataSource(
+ "rhs",
+ MRInput
+ .createConfigurer(new Configuration(tezConf), TextInputFormat.class,
+ rhs.toUri().toString()).groupSplitsInAM(false).create());
Vertex intersectValidateVertex = new Vertex("intersectvalidate",
new ProcessorDescriptor(IntersectValidateProcessor.class.getName()),
@@ -278,13 +261,4 @@ public class IntersectValidate extends Configured implements Tool {
}
}
- private void setupURIsForCredentials(DAG dag, Path... paths) throws IOException {
- List<URI> uris = new LinkedList<URI>();
- for (Path path : paths) {
- FileSystem fs = path.getFileSystem(getConf());
- Path qPath = fs.makeQualified(path);
- uris.add(qPath.toUri());
- }
- dag.addURIsForCredentials(uris);
- }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index ab75441..7279136 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -63,20 +63,21 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
-import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
@@ -479,6 +480,8 @@ public class MRRSleepJob extends Configured implements Tool {
MRHelpers.doJobClientMagic(finalReduceConf);
}
+ DataSourceDescriptor dataSource = null;
+ List<TaskLocationHint> taskLocHint = null;
InputSplitInfo inputSplitInfo = null;
if (!generateSplitsInAM) {
if (writeSplitsToDFS) {
@@ -491,19 +494,19 @@ public class MRRSleepJob extends Configured implements Tool {
} catch (ClassNotFoundException e) {
throw new TezUncheckedException("Failed to generate input splits", e);
}
- } else {
- try {
- LOG.info("Creating in-mem splits");
- inputSplitInfo = MRHelpers.generateInputSplitsToMem(mapStageConf);
- } catch (ClassNotFoundException e) {
- throw new TezUncheckedException("Could not generate input splits", e);
- } catch (InterruptedException e) {
- throw new TezUncheckedException("Could not generate input splits", e);
+ if (inputSplitInfo.getCredentials() != null) {
+ this.credentials.addAll(inputSplitInfo.getCredentials());
}
+ taskLocHint = inputSplitInfo.getTaskLocationHints();
+ byte[] mapInputPayload = MRHelpers.createMRInputPayload(mapStageConf, null);
+ InputDescriptor id = new InputDescriptor(MRInput.class.getName()).setUserPayload(mapInputPayload);
+ dataSource = new DataSourceDescriptor(id, null, null);
+ } else {
+ dataSource = MRInput.createConfigurer(mapStageConf, SleepInputFormat.class).
+ generateSplitsInAM(false).create();
}
- if (inputSplitInfo.getCredentials() != null) {
- this.credentials.addAll(inputSplitInfo.getCredentials());
- }
+ } else {
+ dataSource = MRInput.createConfigurer(mapStageConf, SleepInputFormat.class).create();
}
DAG dag = new DAG("MRRSleepJob");
@@ -533,24 +536,15 @@ public class MRRSleepJob extends Configured implements Tool {
List<Vertex> vertices = new ArrayList<Vertex>();
- byte[] mapInputPayload = null;
byte[] mapUserPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
- if (writeSplitsToDFS || generateSplitsInAM) {
- mapInputPayload = MRHelpers.createMRInputPayload(mapUserPayload, null);
- } else {
- mapInputPayload = MRHelpers.createMRInputPayload(
- mapUserPayload, inputSplitInfo.getSplitsProto());
- }
int numTasks = generateSplitsInAM ? -1 : numMapper;
Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
MapProcessor.class.getName()).setUserPayload(mapUserPayload),
numTasks, MRHelpers.getMapResource(mapStageConf));
- if (!generateSplitsInAM) {
- mapVertex.setLocationHint(new VertexLocationHint(inputSplitInfo.getTaskLocationHints()));
- }
-
+
if (writeSplitsToDFS) {
+ mapVertex.setLocationHint(new VertexLocationHint(taskLocHint));
Map<String, LocalResource> mapLocalResources = new HashMap<String, LocalResource>();
mapLocalResources.putAll(commonLocalResources);
MRHelpers.updateLocalResourcesForInputSplits(remoteFs, inputSplitInfo,
@@ -560,17 +554,7 @@ public class MRRSleepJob extends Configured implements Tool {
mapVertex.setTaskLocalFiles(commonLocalResources);
}
- if (generateSplitsInAM) {
- MRHelpers.addMRInput(mapVertex, mapInputPayload,
- new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()));
- } else {
- if (writeSplitsToDFS) {
- MRHelpers.addMRInput(mapVertex, mapInputPayload, null);
- } else {
- MRHelpers.addMRInput(mapVertex, mapInputPayload,
- new InputInitializerDescriptor(MRInputSplitDistributor.class.getName()));
- }
- }
+ mapVertex.addDataSource("MRInput", dataSource);
vertices.add(mapVertex);
if (iReduceStagesCount > 0
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
index 4455bb4..259a314 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/UnionExample.java
@@ -30,21 +30,18 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.GroupInputEdge;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
-import org.apache.tez.dag.api.OutputCommitterDescriptor;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
@@ -52,10 +49,9 @@ import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.mapreduce.committer.MROutputCommitter;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.input.MRInput.MRInputConfigurer;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.LogicalInput;
@@ -174,27 +170,24 @@ public class UnionExample {
int numMaps = -1;
Configuration inputConf = new Configuration(tezConf);
- inputConf.set(FileInputFormat.INPUT_DIR, inputPath);
- InputDescriptor id = new InputDescriptor(MRInput.class.getName())
- .setUserPayload(MRInput.createUserPayload(inputConf,
- TextInputFormat.class.getName(), true, true));
+ MRInputConfigurer configurer = MRInput.createConfigurer(inputConf, TextInputFormat.class,
+ inputPath);
+ DataSourceDescriptor dataSource = configurer.generateSplitsInAM(false).create();
Vertex mapVertex1 = new Vertex("map1", new ProcessorDescriptor(
TokenProcessor.class.getName()),
numMaps, MRHelpers.getMapResource(tezConf));
- InputInitializerDescriptor iid =
- new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName());
- mapVertex1.addDataSource("MRInput", id, iid);
+ mapVertex1.addDataSource("MRInput", dataSource);
Vertex mapVertex2 = new Vertex("map2", new ProcessorDescriptor(
TokenProcessor.class.getName()),
numMaps, MRHelpers.getMapResource(tezConf));
- mapVertex2.addDataSource("MRInput", id, iid);
+ mapVertex2.addDataSource("MRInput", dataSource);
Vertex mapVertex3 = new Vertex("map3", new ProcessorDescriptor(
TokenProcessor.class.getName()),
numMaps, MRHelpers.getMapResource(tezConf));
- mapVertex3.addDataSource("MRInput", id, iid);
+ mapVertex3.addDataSource("MRInput", dataSource);
Vertex checkerVertex = new Vertex("checker",
new ProcessorDescriptor(
@@ -202,28 +195,21 @@ public class UnionExample {
1, MRHelpers.getReduceResource(tezConf));
Configuration outputConf = new Configuration(tezConf);
- outputConf.set(FileOutputFormat.OUTDIR, outputPath);
- OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
- .setUserPayload(MROutput.createUserPayload(
- outputConf, TextOutputFormat.class.getName(), true));
- OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName());
- checkerVertex.addDataSink("union", od, ocd);
+ DataSinkDescriptor od = MROutput.createConfigurer(outputConf,
+ TextOutputFormat.class, outputPath).create();
+ checkerVertex.addDataSink("union", od);
+
Configuration allPartsConf = new Configuration(tezConf);
- allPartsConf.set(FileOutputFormat.OUTDIR, outputPath+"-all-parts");
- OutputDescriptor od2 = new OutputDescriptor(MROutput.class.getName())
- .setUserPayload(MROutput.createUserPayload(
- allPartsConf, TextOutputFormat.class.getName(), true));
- checkerVertex.addDataSink("all-parts", od2, ocd);
-
- Configuration partsConf = new Configuration(tezConf);
- partsConf.set(FileOutputFormat.OUTDIR, outputPath+"-parts");
-
+ DataSinkDescriptor od2 = MROutput.createConfigurer(allPartsConf,
+ TextOutputFormat.class, outputPath + "-all-parts").create();
+ checkerVertex.addDataSink("all-parts", od2);
+
+ Configuration partsConf = new Configuration(tezConf);
+ DataSinkDescriptor od1 = MROutput.createConfigurer(partsConf,
+ TextOutputFormat.class, outputPath + "-parts").create();
VertexGroup unionVertex = dag.createVertexGroup("union", mapVertex1, mapVertex2);
- OutputDescriptor od1 = new OutputDescriptor(MROutput.class.getName())
- .setUserPayload(MROutput.createUserPayload(
- partsConf, TextOutputFormat.class.getName(), true));
- unionVertex.addDataSink("parts", od1, ocd);
+ unionVertex.addDataSink("parts", od1);
OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
.newBuilder(Text.class.getName(), IntWritable.class.getName(),
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
index 0de2b04..ab5ac57 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/WordCount.java
@@ -29,9 +29,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
@@ -40,18 +38,14 @@ import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
-import org.apache.tez.dag.api.OutputCommitterDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.committer.MROutputCommitter;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
@@ -123,29 +117,20 @@ public class WordCount extends Configured implements Tool {
Map<String, LocalResource> localResources, Path stagingDir,
String inputPath, String outputPath) throws IOException {
- Configuration inputConf = new Configuration(tezConf);
- inputConf.set(FileInputFormat.INPUT_DIR, inputPath);
- InputDescriptor id = new InputDescriptor(MRInput.class.getName())
- .setUserPayload(MRInput.createUserPayload(inputConf,
- TextInputFormat.class.getName(), true, true));
- InputInitializerDescriptor iid = new InputInitializerDescriptor(
- MRInputAMSplitGenerator.class.getName());
-
- Configuration outputConf = new Configuration(tezConf);
- outputConf.set(FileOutputFormat.OUTDIR, outputPath);
- OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
- .setUserPayload(MROutput.createUserPayload(
- outputConf, TextOutputFormat.class.getName(), true));
- OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName());
+ DataSourceDescriptor dataSource = MRInput.createConfigurer(new Configuration(tezConf),
+ TextInputFormat.class, inputPath).create();
+
+ DataSinkDescriptor dataSink = MROutput.createConfigurer(new Configuration(tezConf),
+ TextOutputFormat.class, outputPath).create();
Vertex tokenizerVertex = new Vertex("tokenizer", new ProcessorDescriptor(
TokenProcessor.class.getName()), -1, MRHelpers.getMapResource(tezConf));
- tokenizerVertex.addDataSource("MRInput", id, iid);
+ tokenizerVertex.addDataSource("MRInput", dataSource);
Vertex summerVertex = new Vertex("summer",
new ProcessorDescriptor(
SumProcessor.class.getName()), 1, MRHelpers.getReduceResource(tezConf));
- summerVertex.addDataSink("MROutput", od, ocd);
+ summerVertex.addDataSink("MROutput", dataSink);
OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer
.newBuilder(Text.class.getName(), IntWritable.class.getName(),
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index c6b2fe4..48679c6 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -63,6 +63,8 @@ import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezYARNUtils;
import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputCommitterDescriptor;
@@ -983,14 +985,15 @@ public class MRHelpers {
InputInitializerDescriptor initClazz) {
InputDescriptor id = new InputDescriptor(MRInputLegacy.class.getName())
.setUserPayload(userPayload);
- vertex.addDataSource("MRInput", id, initClazz);
+ vertex.addDataSource("MRInput", new DataSourceDescriptor(id, initClazz, null));
}
@Private
public static void addMROutputLegacy(Vertex vertex, byte[] userPayload) {
OutputDescriptor od = new OutputDescriptor(MROutputLegacy.class.getName())
.setUserPayload(userPayload);
- vertex.addDataSink("MROutput", od, new OutputCommitterDescriptor(MROutputCommitter.class.getName()));
+ vertex.addDataSink("MROutput", new DataSinkDescriptor(od,
+ new OutputCommitterDescriptor(MROutputCommitter.class.getName()), null));
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/tez/blob/1831fda9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index e36eb4d..bcb8f37 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -18,6 +18,7 @@
package org.apache.tez.mapreduce.input;
import java.io.IOException;
+import java.net.URI;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -26,13 +27,25 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat;
+import org.apache.hadoop.security.Credentials;
+import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
+import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.input.base.MRInputBase;
@@ -48,6 +61,7 @@ import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
import org.apache.tez.runtime.library.api.KeyValueReader;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
/**
* {@link MRInput} is an {@link Input} which provides key/values pairs
@@ -61,6 +75,208 @@ import com.google.common.base.Preconditions;
public class MRInput extends MRInputBase {
+ /**
+ * Helper class to configure {@link MRInput}
+ *
+ */
+ public static class MRInputConfigurer {
+ final Configuration conf;
+ final Class<?> inputFormat;
+ boolean useNewApi;
+ boolean groupSplitsInAM = true;
+ boolean generateSplitsInAM = true;
+ String inputClassName = MRInput.class.getName();
+ boolean getCredentialsForSourceFilesystem = true;
+ String inputPaths = null;
+
+ private MRInputConfigurer(Configuration conf, Class<?> inputFormat) {
+ this.conf = conf;
+ this.inputFormat = inputFormat;
+ if (org.apache.hadoop.mapred.InputFormat.class.isAssignableFrom(inputFormat)) {
+ useNewApi = false;
+ } else if(org.apache.hadoop.mapreduce.InputFormat.class.isAssignableFrom(inputFormat)) {
+ useNewApi = true;
+ } else {
+ throw new TezUncheckedException("inputFormat must be assignable from either " +
+ "org.apache.hadoop.mapred.InputFormat or " +
+ "org.apache.hadoop.mapreduce.InputFormat" +
+ " Given: " + inputFormat.getName());
+ }
+ }
+
+ MRInputConfigurer setInputClassName(String className) {
+ this.inputClassName = className;
+ return this;
+ }
+
+ private MRInputConfigurer setInputPaths(String inputPaths) {
+ if (!(org.apache.hadoop.mapred.FileInputFormat.class.isAssignableFrom(inputFormat) ||
+ FileInputFormat.class.isAssignableFrom(inputFormat))) {
+ throw new TezUncheckedException("When setting inputPaths the inputFormat must be " +
+ "assignable from either org.apache.hadoop.mapred.FileInputFormat or " +
+ "org.apache.hadoop.mapreduce.lib.input.FileInputFormat. " +
+ "Otherwise use the non-path configurer." +
+ " Given: " + inputFormat.getName());
+ }
+ conf.set(FileInputFormat.INPUT_DIR, inputPaths);
+ this.inputPaths = inputPaths;
+ return this;
+ }
+
+ /**
+ * Set whether splits should be grouped in the Tez App Master (default true)
+ * @param value whether to group splits in the AM or not
+ * @return {@link MRInputConfigurer}
+ */
+ public MRInputConfigurer groupSplitsInAM(boolean value) {
+ groupSplitsInAM = value;
+ return this;
+ }
+
+ /**
+ * Set whether splits should be generated in the Tez App Master (default true)
+ * @param value whether to generate splits in the AM or not
+ * @return {@link MRInputConfigurer}
+ */
+ public MRInputConfigurer generateSplitsInAM(boolean value) {
+ generateSplitsInAM = value;
+ return this;
+ }
+
+ /**
+ * Get the credentials for the inputPaths from their {@link FileSystem}s
+ * Use the method to turn this off when not using a {@link FileSystem}
+ * or when {@link Credentials} are not supported
+ * @param value whether to get credentials or not. (true by default)
+ * @return {@link MRInputConfigurer}
+ */
+ public MRInputConfigurer getCredentialsForSourceFileSystem(boolean value) {
+ getCredentialsForSourceFilesystem = value;
+ return this;
+ }
+
+ /**
+ * Create the {@link DataSourceDescriptor}
+ * @return {@link DataSourceDescriptor}
+ */
+ public DataSourceDescriptor create() {
+ try {
+ if (generateSplitsInAM) {
+ return createGeneratorDataSource();
+ } else {
+ return createDistributorDataSource();
+ }
+ } catch (Exception e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ private DataSourceDescriptor createDistributorDataSource() throws IOException {
+ Configuration inputConf = new JobConf(conf);
+ InputSplitInfo inputSplitInfo;
+ try {
+ inputSplitInfo = MRHelpers.generateInputSplitsToMem(inputConf);
+ } catch (Exception e) {
+ throw new TezUncheckedException(e);
+ }
+ inputConf.setBoolean("mapred.mapper.new-api", useNewApi);
+ if (useNewApi) {
+ inputConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, inputFormat.getName());
+ } else {
+ inputConf.set("mapred.input.format.class", inputFormat.getName());
+ }
+ MRHelpers.translateVertexConfToTez(inputConf);
+ MRHelpers.doJobClientMagic(inputConf);
+ byte[] payload = MRHelpers.createMRInputPayload(inputConf, inputSplitInfo.getSplitsProto());
+ Credentials credentials = null;
+ if (getCredentialsForSourceFilesystem && inputSplitInfo.getCredentials() != null) {
+ credentials = inputSplitInfo.getCredentials();
+ }
+ return new DataSourceDescriptor(
+ new InputDescriptor(inputClassName).setUserPayload(payload),
+ new InputInitializerDescriptor(MRInputSplitDistributor.class.getName()),
+ inputSplitInfo.getNumTasks(), credentials,
+ new VertexLocationHint(inputSplitInfo.getTaskLocationHints()));
+ }
+
+ private DataSourceDescriptor createGeneratorDataSource() throws IOException {
+ Configuration inputConf = new JobConf(conf);
+ String wrappedInputFormatClassName = null;
+ String configInputFormatClassName = null;
+ if (groupSplitsInAM) {
+ wrappedInputFormatClassName = inputFormat.getName();
+ if (useNewApi) {
+ configInputFormatClassName = TezGroupedSplitsInputFormat.class.getName();
+ } else {
+ configInputFormatClassName =
+ org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat.class.getName();
+ }
+ } else {
+ wrappedInputFormatClassName = null;
+ configInputFormatClassName = inputFormat.getName();
+ }
+ inputConf.setBoolean("mapred.mapper.new-api", useNewApi);
+ if (useNewApi) {
+ inputConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, configInputFormatClassName);
+ } else {
+ inputConf.set("mapred.input.format.class", configInputFormatClassName);
+ }
+ MRHelpers.translateVertexConfToTez(inputConf);
+ MRHelpers.doJobClientMagic(inputConf);
+
+ Credentials credentials = null;
+ if (getCredentialsForSourceFilesystem && inputPaths != null) {
+ try {
+ List<URI> uris = Lists.newLinkedList();
+ for (String inputPath : inputPaths.split(",")) {
+ Path path = new Path(inputPath);
+ FileSystem fs;
+ fs = path.getFileSystem(conf);
+ Path qPath = fs.makeQualified(path);
+ uris.add(qPath.toUri());
+ }
+ credentials = new Credentials();
+ TezClientUtils.addFileSystemCredentialsFromURIs(uris, credentials, conf);
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
+ byte[] payload = null;
+ if (groupSplitsInAM) {
+ payload = MRHelpers.createMRInputPayloadWithGrouping(inputConf,
+ wrappedInputFormatClassName);
+ } else {
+ payload = MRHelpers.createMRInputPayload(inputConf, null);
+ }
+ return new DataSourceDescriptor(
+ new InputDescriptor(inputClassName).setUserPayload(payload),
+ new InputInitializerDescriptor(MRInputAMSplitGenerator.class.getName()), credentials);
+ }
+ }
+
+ /**
+ * Create an {@link MRInputConfigurer}
+ * @param conf Configuration for the {@link MRInput}
+ * @param inputFormat InputFormat derived class
+ * @return {@link MRInputConfigurer}
+ */
+ public static MRInputConfigurer createConfigurer(Configuration conf, Class<?> inputFormat) {
+ return new MRInputConfigurer(conf, inputFormat);
+ }
+
+ /**
+ * Create an {@link MRInputConfigurer} for a FileInputFormat
+ * @param conf Configuration for the {@link MRInput}
+ * @param inputFormat FileInputFormat derived class
+ * @param inputPaths Comma separated input paths
+ * @return {@link MRInputConfigurer}
+ */
+ public static MRInputConfigurer createConfigurer(Configuration conf, Class<?> inputFormat,
+ String inputPaths) {
+ return new MRInputConfigurer(conf, inputFormat).setInputPaths(inputPaths);
+ }
+
private static final Log LOG = LogFactory.getLog(MRInput.class);
private final ReentrantLock rrLock = new ReentrantLock();
@@ -83,54 +299,6 @@ public class MRInput extends MRInputBase {
super(inputContext, numPhysicalInputs);
}
- /**
- * Helper API to generate the user payload for the MRInput and
- * MRInputAMSplitGenerator (if used). The InputFormat will be invoked by Tez
- * at DAG runtime to generate the input splits.
- *
- * @param conf
- * Configuration for the InputFormat
- * @param inputFormatClassName
- * Name of the class of the InputFormat
- * @param useNewApi
- * use new mapreduce API or old mapred API
- * @param groupSplitsInAM
- * do grouping of splits in the AM. If true then splits generated by
- * the InputFormat will be grouped in the AM based on available
- * resources, locality etc. This option may be set to true only when
- * using MRInputAMSplitGenerator as the initializer class in
- * {@link Vertex#addDataSource(String, org.apache.tez.dag.api.InputDescriptor,
- * org.apache.tez.dag.api.InputInitializerDescriptor)}
- * @return returns the user payload to be set on the InputDescriptor of
- * MRInput
- * @throws IOException
- */
- public static byte[] createUserPayload(Configuration conf,
- String inputFormatClassName, boolean useNewApi, boolean groupSplitsInAM)
- throws IOException {
- Configuration inputConf = new JobConf(conf);
- String wrappedInputFormatClassName = null;
- String configInputFormatClassName = null;
- if (groupSplitsInAM) {
- wrappedInputFormatClassName = inputFormatClassName;
- configInputFormatClassName = TezGroupedSplitsInputFormat.class.getName();
- } else {
- wrappedInputFormatClassName = null;
- configInputFormatClassName = inputFormatClassName;
- }
- inputConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
- configInputFormatClassName);
- inputConf.setBoolean("mapred.mapper.new-api", useNewApi);
- MRHelpers.translateVertexConfToTez(inputConf);
- MRHelpers.doJobClientMagic(inputConf);
- if (groupSplitsInAM) {
- return MRHelpers.createMRInputPayloadWithGrouping(inputConf,
- wrappedInputFormatClassName);
- } else {
- return MRHelpers.createMRInputPayload(inputConf, null);
- }
- }
-
@Override
public List<Event> initialize() throws IOException {
super.initialize();