You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/04/02 20:08:31 UTC
[4/5] flink git commit: Added support for Apache Tez as an execution
environment
Added support for Apache Tez as an execution environment
This closes #189
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cbbd328
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cbbd328
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cbbd328
Branch: refs/heads/master
Commit: 2cbbd3287252fd33472ae52cfa6a6b38b3b1c39e
Parents: 2ffefdf
Author: Kostas Tzoumas <ko...@gmail.com>
Authored: Tue Jan 6 15:01:03 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Apr 2 20:07:58 2015 +0200
----------------------------------------------------------------------
.travis.yml | 2 +-
docs/_includes/sidenav.html | 3 +-
docs/flink_on_tez_guide.md | 292 ++++++++++
docs/img/flink_on_tez_translation.png | Bin 0 -> 350867 bytes
docs/img/flink_tez_vertex.png | Bin 0 -> 105544 bytes
.../scala/graph/TransitiveClosureNaive.scala | 2 +-
flink-quickstart/flink-tez-quickstart/pom.xml | 37 ++
.../src/main/java/Dummy.java | 28 +
.../META-INF/maven/archetype-metadata.xml | 36 ++
.../main/resources/archetype-resources/pom.xml | 186 ++++++
.../src/assembly/flink-fat-jar.xml | 40 ++
.../src/main/java/Driver.java | 115 ++++
.../src/main/java/LocalJob.java | 72 +++
.../src/main/java/LocalWordCount.java | 96 ++++
.../src/main/java/YarnJob.java | 75 +++
.../src/main/java/YarnWordCount.java | 124 ++++
.../projects/testArtifact/archetype.properties | 21 +
.../resources/projects/testArtifact/goal.txt | 1 +
flink-quickstart/pom.xml | 10 +
.../runtime/operators/util/TaskConfig.java | 3 +-
flink-staging/flink-tez/pom.xml | 224 ++++++++
.../flink-tez/src/assembly/flink-fat-jar.xml | 42 ++
.../flink/tez/client/LocalTezEnvironment.java | 71 +++
.../flink/tez/client/RemoteTezEnvironment.java | 78 +++
.../apache/flink/tez/client/TezExecutor.java | 198 +++++++
.../flink/tez/client/TezExecutorTool.java | 80 +++
.../flink/tez/dag/FlinkBroadcastEdge.java | 70 +++
.../flink/tez/dag/FlinkDataSinkVertex.java | 61 ++
.../flink/tez/dag/FlinkDataSourceVertex.java | 82 +++
.../org/apache/flink/tez/dag/FlinkEdge.java | 45 ++
.../apache/flink/tez/dag/FlinkForwardEdge.java | 71 +++
.../flink/tez/dag/FlinkPartitionEdge.java | 71 +++
.../flink/tez/dag/FlinkProcessorVertex.java | 61 ++
.../apache/flink/tez/dag/FlinkUnionVertex.java | 61 ++
.../org/apache/flink/tez/dag/FlinkVertex.java | 114 ++++
.../apache/flink/tez/dag/TezDAGGenerator.java | 460 +++++++++++++++
.../tez/examples/ConnectedComponentsStep.java | 203 +++++++
.../flink/tez/examples/ExampleDriver.java | 119 ++++
.../flink/tez/examples/PageRankBasicStep.java | 241 ++++++++
.../apache/flink/tez/examples/TPCHQuery3.java | 224 ++++++++
.../examples/TransitiveClosureNaiveStep.java | 135 +++++
.../apache/flink/tez/examples/WordCount.java | 129 +++++
.../flink/tez/runtime/DataSinkProcessor.java | 228 ++++++++
.../flink/tez/runtime/DataSourceProcessor.java | 190 +++++++
.../flink/tez/runtime/RegularProcessor.java | 128 +++++
.../tez/runtime/TezRuntimeEnvironment.java | 50 ++
.../org/apache/flink/tez/runtime/TezTask.java | 570 +++++++++++++++++++
.../apache/flink/tez/runtime/TezTaskConfig.java | 163 ++++++
.../flink/tez/runtime/UnionProcessor.java | 106 ++++
.../flink/tez/runtime/input/FlinkInput.java | 139 +++++
.../runtime/input/FlinkInputSplitGenerator.java | 94 +++
.../tez/runtime/input/TezReaderIterator.java | 66 +++
.../tez/runtime/output/SimplePartitioner.java | 35 ++
.../tez/runtime/output/TezChannelSelector.java | 36 ++
.../tez/runtime/output/TezOutputCollector.java | 72 +++
.../tez/runtime/output/TezOutputEmitter.java | 190 +++++++
.../apache/flink/tez/util/DummyInvokable.java | 51 ++
.../apache/flink/tez/util/EncodingUtils.java | 64 +++
.../flink/tez/util/FlinkSerialization.java | 310 ++++++++++
.../src/main/resources/log4j.properties | 30 +
.../tez/test/ConnectedComponentsStepITCase.java | 83 +++
.../flink/tez/test/PageRankBasicStepITCase.java | 54 ++
.../flink/tez/test/TezProgramTestBase.java | 104 ++++
.../flink/tez/test/WebLogAnalysisITCase.java | 48 ++
.../apache/flink/tez/test/WordCountITCase.java | 47 ++
.../src/test/resources/log4j-test.properties | 30 +
.../src/test/resources/logback-test.xml | 37 ++
flink-staging/pom.xml | 12 +-
.../flink/test/testdata/WebLogAnalysisData.java | 149 +++++
.../flink/test/testdata/WebLogAnalysisData.java | 149 -----
pom.xml | 111 ++--
71 files changed, 7119 insertions(+), 210 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index bf3e0b4..2e6a041 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -23,7 +23,7 @@ matrix:
- jdk: "openjdk6" # we must use openjdk6 here to deploy a java6 compatible uber-jar for YARN
env: PROFILE="-Dhadoop.version=2.2.0"
- jdk: "oraclejdk8"
- env: PROFILE="-Dhadoop.version=2.6.0 -Dscala-2.11 -Dmaven.javadoc.skip=true"
+ env: PROFILE="-Dhadoop.version=2.6.0 -Dscala-2.11 -Pinclude-tez -Dmaven.javadoc.skip=true"
git:
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/docs/_includes/sidenav.html
----------------------------------------------------------------------
diff --git a/docs/_includes/sidenav.html b/docs/_includes/sidenav.html
index 66243fd..37bba25 100644
--- a/docs/_includes/sidenav.html
+++ b/docs/_includes/sidenav.html
@@ -41,7 +41,8 @@ under the License.
<li><div class="sidenav-item"><a href="{{ site.baseurl }}/spargel_guide.html">Spargel Graph API</a></div></li>
<li><div class="sidenav-item"><a href="{{ site.baseurl }}/gelly_guide.html">Gelly Graph API</a></div></li>
<li><div class="sidenav-item"><a href="{{ site.baseurl }}/linq.html">Language-Integrated Queries</a></div></li>
- <li><div class="sidenav-item-bottom"><a href="{{ site.baseurl }}/hadoop_compatibility.html">Hadoop Compatibility</a></div></li>
+ <li><div class="sidenav-item"><a href="{{ site.baseurl }}/hadoop_compatibility.html">Hadoop Compatibility</a></div></li>
+ <li><div class="sidenav-item-bottom"><a href="{{ site.baseurl }}/flink_on_tez_guide.html">Running Flink on Tez</a></div></li>
<li><div class="sidenav-category">Examples</div></li>
<li><div class="sidenav-item"><a href="{{ site.baseurl }}/examples.html">Bundled Examples</a></div></li>
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/docs/flink_on_tez_guide.md
----------------------------------------------------------------------
diff --git a/docs/flink_on_tez_guide.md b/docs/flink_on_tez_guide.md
new file mode 100644
index 0000000..08aed28
--- /dev/null
+++ b/docs/flink_on_tez_guide.md
@@ -0,0 +1,292 @@
+---
+title: "Running Flink on YARN leveraging Tez"
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+
+<a href="#top"></a>
+
+## Introduction
+
+You can run Flink using Tez as an execution environment. Flink on Tez
+is currently included in *flink-staging* in alpha. All classes are
+localted in the *org.apache.flink.tez* package.
+
+## Why Flink on Tez
+
+[Apache Tez](tez.apache.org) is a scalable data processing
+platform. Tez provides an API for specifying a directed acyclic
+graph (DAG), and functionality for placing the DAG vertices in YARN
+containers, as well as data shuffling. In Flink's architecture,
+Tez is at about the same level as Flink's network stack. While Flink's
+network stack focuses heavily on low latency in order to support
+pipelining, data streaming, and iterative algorithms, Tez
+focuses on scalability and elastic resource usage.
+
+Thus, by replacing Flink's network stack with Tez, users can get scalability
+and elastic resource usage in shared clusters while retaining Flink's
+APIs, optimizer, and runtime algorithms (local sorts, hash tables, etc).
+
+Flink programs can run almost unmodified using Tez as an execution
+environment. Tez supports local execution (e.g., for debugging), and
+remote execution on YARN.
+
+
+## Local execution
+
+The `LocalTezEnvironment` can be used run programs using the local
+mode provided by Tez. This is for example WordCount using Tez local mode.
+It is identical to a normal Flink WordCount, except that the `LocalTezEnvironment` is used.
+To run in local Tez mode, you can simply run a Flink on Tez program
+from your IDE (e.g., right click and run).
+
+{% highlight java %}
+public class WordCountExample {
+ public static void main(String[] args) throws Exception {
+ final LocalTezEnvironment env = LocalTezEnvironment.create();
+
+ DataSet<String> text = env.fromElements(
+ "Who's there?",
+ "I think I hear them. Stand, ho! Who's there?");
+
+ DataSet<Tuple2<String, Integer>> wordCounts = text
+ .flatMap(new LineSplitter())
+ .groupBy(0)
+ .sum(1);
+
+ wordCounts.print();
+
+ env.execute("Word Count Example");
+ }
+
+ public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
+ @Override
+ public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
+ for (String word : line.split(" ")) {
+ out.collect(new Tuple2<String, Integer>(word, 1));
+ }
+ }
+ }
+}
+{% endhighlight %}
+
+## YARN execution
+
+### Setup
+
+- Install Tez on your Hadoop 2 cluster following the instructions from the
+ [Apache Tez website](http://tez.apache.org/install.html). If you are able to run
+ the examples that ship with Tez, then Tez has been successfully installed.
+
+- Currently, you need to build Flink yourself to obtain Flink on Tez
+ (the reason is a Hadoop version compatibility: Tez releases artifacts
+ on Maven central with a Hadoop 2.6.0 dependency). Build Flink
+ using `mvn -DskipTests clean package -Pinclude-tez -Dhadoop.version=X.X.X -Dtez.version=X.X.X`.
+ Make sure that the Hadoop version matches the version that Tez uses.
+ Obtain the jar file contained in the Flink distribution under
+ `flink-staging/flink-tez/target/flink-tez-x.y.z-flink-fat-jar.jar`
+ and upload it to some directory in HDFS. E.g., to upload the file
+ to the directory `/apps`, execute
+ {% highlight bash %}
+ $ hadoop fs -put /path/to/flink-tez-x.y.z-flink-fat-jar.jar /apps
+ {% endhighlight %}
+
+- Edit the tez-site.xml configuration file, adding an entry that points to the
+ location of the file. E.g., assuming that the file is in the directory `/apps/`,
+ add the following entry to tez-site.xml
+ ~~~<property>
+ <name>tez.aux.uris</name>
+ <value>${fs.default.name}/apps/flink-tez-x.y.z-flink-fat-jar.jar</value>
+ </property>
+ ~~~
+
+- At this point, you should be able to run the pre-packaged examples, e.g., run WordCount as:
+ {% highlight bash %}
+ $ hadoop jar /path/to/flink-tez-x.y.z-flink-fat-jar.jar wc hdfs:/path/to/text hdfs:/path/to/output
+ {% endhighlight %}
+
+
+### Packaging your program
+
+Application packaging is currently a bit different than in Flink standalone mode.
+ Flink programs that run on Tez need to be packaged in a "fat jar"
+ file that contain the Flink client and executed via the `hadoop jar` command.
+ An easy way to do that is to use the provided `flink-tez-quickstart` maven archetype.
+ Create a new project as
+
+ {% highlight bash %}
+ $ mvn archetype:generate \
+ -DarchetypeGroupId=org.apache.flink \
+ -DarchetypeArtifactId=flink-tez-quickstart \
+ -DarchetypeVersion={{site.FLINK_VERSION_SHORT}}
+ {% endhighlight %}
+
+ and specify the group id, artifact id, version, and package of your project. For example,
+ let us assume the following options: `org.myorganization`, `flink-on-tez`, `0.1`, and `org.myorganization`.
+ You should see the following output on your terminal:
+
+ {% highlight bash %}
+ $ mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-tez-quickstart
+ [INFO] Scanning for projects...
+ [INFO]
+ [INFO] ------------------------------------------------------------------------
+ [INFO] Building Maven Stub Project (No POM) 1
+ [INFO] ------------------------------------------------------------------------
+ [INFO]
+ [INFO] >>> maven-archetype-plugin:2.2:generate (default-cli) > generate-sources @ standalone-pom >>>
+ [INFO]
+ [INFO] <<< maven-archetype-plugin:2.2:generate (default-cli) < generate-sources @ standalone-pom <<<
+ [INFO]
+ [INFO] --- maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom ---
+ [INFO] Generating project in Interactive mode
+ [INFO] Archetype [org.apache.flink:flink-tez-quickstart:0.9-SNAPSHOT] found in catalog local
+ Define value for property 'groupId': : org.myorganization
+ Define value for property 'artifactId': : flink-on-tez
+ Define value for property 'version': 1.0-SNAPSHOT: : 0.1
+ Define value for property 'package': org.myorganization: :
+ Confirm properties configuration:
+ groupId: org.myorganization
+ artifactId: flink-on-tez
+ version: 0.1
+ package: org.myorganization
+ Y: : Y
+ [INFO] ----------------------------------------------------------------------------
+ [INFO] Using following parameters for creating project from Archetype: flink-tez-quickstart:0.9-SNAPSHOT
+ [INFO] ----------------------------------------------------------------------------
+ [INFO] Parameter: groupId, Value: org.myorganization
+ [INFO] Parameter: artifactId, Value: flink-on-tez
+ [INFO] Parameter: version, Value: 0.1
+ [INFO] Parameter: package, Value: org.myorganization
+ [INFO] Parameter: packageInPathFormat, Value: org/myorganization
+ [INFO] Parameter: package, Value: org.myorganization
+ [INFO] Parameter: version, Value: 0.1
+ [INFO] Parameter: groupId, Value: org.myorganization
+ [INFO] Parameter: artifactId, Value: flink-on-tez
+ [INFO] project created from Archetype in dir: /Users/kostas/Dropbox/flink-tez-quickstart-test/flink-on-tez
+ [INFO] ------------------------------------------------------------------------
+ [INFO] BUILD SUCCESS
+ [INFO] ------------------------------------------------------------------------
+ [INFO] Total time: 44.130 s
+ [INFO] Finished at: 2015-02-26T17:59:45+01:00
+ [INFO] Final Memory: 15M/309M
+ [INFO] ------------------------------------------------------------------------
+ {% endhighlight %}
+
+ The project contains an example called `YarnJob.java` that provides the skeleton
+ for a Flink-on-Tez job. Programs execution is done currently via Hadoop's `ProgramDriver`,
+ see the `Driver.java` class for an example. Create the fat jar using
+ `mvn -DskipTests clean package`. The resulting jar will be located in the `target/` directory.
+ You can now execute a job as follows:
+
+ {% highlight bash %}
+$ mvn -DskipTests clean package
+$ hadoop jar flink-on-tez/target/flink-on-tez-0.1-flink-fat-jar.jar yarnjob [command-line parameters]
+ {% endhighlight %}
+
+ Flink programs that run on YARN using Tez as an execution engine need to use the `RemoteTezEnvironment` and
+ register the class that contains the `main` method with that environment:
+ {% highlight java %}
+ public class WordCountExample {
+ public static void main(String[] args) throws Exception {
+ final RemoteTezEnvironment env = RemoteTezEnvironment.create();
+
+ DataSet<String> text = env.fromElements(
+ "Who's there?",
+ "I think I hear them. Stand, ho! Who's there?");
+
+ DataSet<Tuple2<String, Integer>> wordCounts = text
+ .flatMap(new LineSplitter())
+ .groupBy(0)
+ .sum(1);
+
+ wordCounts.print();
+
+ env.registerMainClass(WordCountExample.class);
+ env.execute("Word Count Example");
+ }
+
+ public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
+ @Override
+ public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
+ for (String word : line.split(" ")) {
+ out.collect(new Tuple2<String, Integer>(word, 1));
+ }
+ }
+ }
+ }
+ {% endhighlight %}
+
+
+## How it works
+
+Flink on Tez reuses the Flink APIs, the Flink optimizer,
+and the Flink local runtime, including Flink's hash table and sort implementations. Tez
+replaces Flink's network stack and control plan, and is responsible for scheduling and
+network shuffles.
+
+The figure below shows how a Flink program passes through the Flink stack and generates
+a Tez DAG (instead of a JobGraph that would be created using normal Flink execution).
+
+<div style="text-align: center;">
+<img src="img/flink_on_tez_translation.png" alt="Translation of a Flink program to a Tez DAG." height="600px" vspace="20px" style="text-align: center;"/>
+</div>
+
+All local processing, including memory management, sorting, and hashing is performed by
+Flink as usual. Local processing is encapsulated in Tez vertices, as seen in the figure
+below. Tez vertices are connected by edges. Tez is currently based on a key-value data
+model. In the current implementation, the elements that are processed by Flink operators
+are wrapped inside Tez values, and the Tez key field is used to indicate the index of the target task
+that the elements are destined to.
+
+<div style="text-align: center;">
+<img src="img/flink_tez_vertex.png" alt="Encapsulation of Flink runtime inside Tez vertices." height="200px" vspace="20px" style="text-align: center;"/>
+</div>
+
+## Limitations
+
+Currently, Flink on Tez does not support all features of the Flink API. We are working
+to enable all of the missing features listed below. In the meantime, if your project depends on these features, we suggest
+to use [Flink on YARN]({{site.baseurl}}/yarn_setup.html) or [Flink standalone]({{site.baseurl}}/setup_quickstart.html).
+
+The following features are currently missing.
+
+- Dedicated client: jobs need to be submitted via Hadoop's command-line client
+
+- Self-joins: currently binary operators that receive the same input are not supported due to
+ [TEZ-1190](https://issues.apache.org/jira/browse/TEZ-1190).
+
+- Iterative programs are currently not supported.
+
+- Broadcast variables are currently not supported.
+
+- Accummulators and counters are currently not supported.
+
+- Performance: The current implementation has not been heavily tested for performance, and misses several optimizations,
+ including task chaining.
+
+- Streaming API: Streaming programs will not currently compile to Tez DAGs.
+
+- Scala API: The current implementation has only been tested with the Java API.
+
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/docs/img/flink_on_tez_translation.png
----------------------------------------------------------------------
diff --git a/docs/img/flink_on_tez_translation.png b/docs/img/flink_on_tez_translation.png
new file mode 100644
index 0000000..88fa4d5
Binary files /dev/null and b/docs/img/flink_on_tez_translation.png differ
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/docs/img/flink_tez_vertex.png
----------------------------------------------------------------------
diff --git a/docs/img/flink_tez_vertex.png b/docs/img/flink_tez_vertex.png
new file mode 100644
index 0000000..b469862
Binary files /dev/null and b/docs/img/flink_tez_vertex.png differ
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
index d171611..727cb47 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
@@ -21,7 +21,7 @@ import org.apache.flink.api.scala._
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData
import org.apache.flink.util.Collector
-object TransitiveClosureNaive {
+object TransitiveClosureNaive {
def main (args: Array[String]): Unit = {
if (!parseParameters(args)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/pom.xml b/flink-quickstart/flink-tez-quickstart/pom.xml
new file mode 100644
index 0000000..3e41e54
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/pom.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-quickstart</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-tez-quickstart</artifactId>
+ <packaging>maven-archetype</packaging>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/java/Dummy.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/java/Dummy.java b/flink-quickstart/flink-tez-quickstart/src/main/java/Dummy.java
new file mode 100644
index 0000000..c7749ff
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/java/Dummy.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.quickstart;
+
+/**
+ * This class solely exists to generate
+ * javadocs for the "quickstart-java" project.
+ **/
+public class Dummy {
+ //
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/resources/META-INF/maven/archetype-metadata.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/META-INF/maven/archetype-metadata.xml b/flink-quickstart/flink-tez-quickstart/src/main/resources/META-INF/maven/archetype-metadata.xml
new file mode 100644
index 0000000..adb3b83
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/META-INF/maven/archetype-metadata.xml
@@ -0,0 +1,36 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<archetype-descriptor
+ xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd"
+ name="flink-tez-quickstart">
+ <fileSets>
+ <fileSet filtered="true" packaged="true" encoding="UTF-8">
+ <directory>src/main/java</directory>
+ <includes>
+ <include>**/*.java</include>
+ </includes>
+ </fileSet>
+ <fileSet encoding="UTF-8">
+ <directory>src/assembly</directory>
+ </fileSet>
+ </fileSets>
+</archetype-descriptor>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/pom.xml
new file mode 100644
index 0000000..180077d
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/pom.xml
@@ -0,0 +1,186 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>${groupId}</groupId>
+ <artifactId>${artifactId}</artifactId>
+ <version>${version}</version>
+ <packaging>jar</packaging>
+
+ <name>Your Job's Name</name>
+ <url>http://www.myorganization.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>apache.snapshots</id>
+ <name>Apache Development Snapshot Repository</name>
+ <url>https://repository.apache.org/content/repositories/snapshots/</url>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <!-- These two requirements are the minimum to use and develop Flink.
+ You can add others like <artifactId>flink-scala</artifactId> for Scala! -->
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-tez</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <!-- We use the maven-assembly plugin to create a fat jar that contains all dependencies
+ except flink and it's transitive dependencies. The resulting fat-jar can be executed
+ on a cluster. Change the value of Program-Class if your program entry point changes. -->
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4.1</version>
+ <configuration>
+ <descriptors>
+ <descriptor>src/assembly/flink-fat-jar.xml</descriptor>
+ </descriptors>
+ <archive>
+ <manifest>
+ <mainClass>${package}.Driver</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- Configure the jar plugin to add the main class as a manifest entry -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Main-Class>${package}.Driver</Main-Class>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.1</version>
+ <configuration>
+ <source>1.6</source> <!-- If you want to use Java 8, change this to "1.8" -->
+ <target>1.6</target> <!-- If you want to use Java 8, change this to "1.8" -->
+ </configuration>
+ </plugin>
+ </plugins>
+
+
+ <!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
+ <!--
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ <compilerId>jdt</compilerId>
+ </configuration>
+ <dependencies>
+ <dependency>
+ <groupId>org.eclipse.tycho</groupId>
+ <artifactId>tycho-compiler-jdt</artifactId>
+ <version>0.21.0</version>
+ </dependency>
+ </dependencies>
+ </plugin>
+
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <versionRange>[2.4,)</versionRange>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <versionRange>[3.1,)</versionRange>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ -->
+
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/assembly/flink-fat-jar.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/assembly/flink-fat-jar.xml b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/assembly/flink-fat-jar.xml
new file mode 100644
index 0000000..f9f27e5
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/assembly/flink-fat-jar.xml
@@ -0,0 +1,40 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>flink-fat-jar</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>/</outputDirectory>
+ <useProjectArtifact>true</useProjectArtifact>
+ <excludes>
+ <exclude>org.apache.tez:*</exclude>
+ <exclude>org.apache.hadoop:*</exclude>
+ </excludes>
+ <useTransitiveFiltering>true</useTransitiveFiltering>
+ <unpack>true</unpack>
+ <scope>runtime</scope>
+ </dependencySet>
+ </dependencySets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/Driver.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/Driver.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/Driver.java
new file mode 100644
index 0000000..5431849
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/Driver.java
@@ -0,0 +1,115 @@
+#set($hash = '#')
+
+package ${package};
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.util.ProgramDriver;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.client.VertexStatus;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+import java.util.EnumSet;
+import java.util.Set;
+
+public class Driver {
+
+ private static final DecimalFormat formatter = new DecimalFormat("${hash}${hash}${hash}.${hash}${hash}%");
+
+ public static void main(String [] args){
+ int exitCode = -1;
+ ProgramDriver pgd = new ProgramDriver();
+ try {
+ pgd.addClass("yarnjob", YarnJob.class,
+ "Yarn Job");
+ pgd.addClass("wc", YarnWordCount.class,
+ "Word Count");
+ exitCode = pgd.run(args);
+ } catch(Throwable e){
+ e.printStackTrace();
+ }
+ System.exit(exitCode);
+ }
+
+ public static void printDAGStatus(DAGClient dagClient, String[] vertexNames)
+ throws IOException, TezException {
+ printDAGStatus(dagClient, vertexNames, false, false);
+ }
+
+ public static void printDAGStatus(DAGClient dagClient, String[] vertexNames, boolean displayDAGCounters, boolean displayVertexCounters)
+ throws IOException, TezException {
+ Set<StatusGetOpts> opts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
+ DAGStatus dagStatus = dagClient.getDAGStatus(
+ (displayDAGCounters ? opts : null));
+ Progress progress = dagStatus.getDAGProgress();
+ double vProgressFloat = 0.0f;
+ if (progress != null) {
+ System.out.println("");
+ System.out.println("DAG: State: "
+ + dagStatus.getState()
+ + " Progress: "
+ + (progress.getTotalTaskCount() < 0 ? formatter.format(0.0f) :
+ formatter.format((double)(progress.getSucceededTaskCount())
+ /progress.getTotalTaskCount())));
+ for (String vertexName : vertexNames) {
+ VertexStatus vStatus = dagClient.getVertexStatus(vertexName,
+ (displayVertexCounters ? opts : null));
+ if (vStatus == null) {
+ System.out.println("Could not retrieve status for vertex: "
+ + vertexName);
+ continue;
+ }
+ Progress vProgress = vStatus.getProgress();
+ if (vProgress != null) {
+ vProgressFloat = 0.0f;
+ if (vProgress.getTotalTaskCount() == 0) {
+ vProgressFloat = 1.0f;
+ } else if (vProgress.getTotalTaskCount() > 0) {
+ vProgressFloat = (double)vProgress.getSucceededTaskCount()
+ /vProgress.getTotalTaskCount();
+ }
+ System.out.println("VertexStatus:"
+ + " VertexName: "
+ + (vertexName.equals("ivertex1") ? "intermediate-reducer"
+ : vertexName)
+ + " Progress: " + formatter.format(vProgressFloat));
+ }
+ if (displayVertexCounters) {
+ TezCounters counters = vStatus.getVertexCounters();
+ if (counters != null) {
+ System.out.println("Vertex Counters for " + vertexName + ": "
+ + counters);
+ }
+ }
+ }
+ }
+ if (displayDAGCounters) {
+ TezCounters counters = dagStatus.getDAGCounters();
+ if (counters != null) {
+ System.out.println("DAG Counters: " + counters);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java
new file mode 100644
index 0000000..cf7474e
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java
@@ -0,0 +1,72 @@
+package ${package};
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.tez.client.LocalTezEnvironment;
+
+/**
+ * Skeleton for a Flink on Tez Job running using Tez local mode.
+ *
+ * For a full example of a Flink on TezJob, see the WordCountJob.java file in the
+ * same package/directory or have a look at the website.
+ *
+ * You can also generate a .jar file that you can submit on your Flink
+ * cluster.
+ * Just type
+ * mvn clean package
+ * in the projects root directory.
+ * You will find the jar in
+ * target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
+ *
+ */
+public class LocalJob {
+
+ public static void main(String[] args) throws Exception {
+ // set up the execution environment
+
+ // To use Tez YARN execution, use
+ final LocalTezEnvironment env = LocalTezEnvironment.create();
+
+ /**
+ * Here, you can start creating your execution plan for Flink.
+ *
+ * Start with getting some data from the environment, like
+ * env.readTextFile(textPath);
+ *
+ * then, transform the resulting DataSet<String> using operations
+ * like
+ * .filter()
+ * .flatMap()
+ * .join()
+ * .coGroup()
+ * and many more.
+ * Have a look at the programming guide for the Java API:
+ *
+ * http://flink.apache.org/docs/latest/programming_guide.html
+ *
+ * and the examples
+ *
+ * http://flink.apache.org/docs/latest/examples.html
+ *
+ */
+
+ // execute program
+ env.execute("Flink Java API Skeleton");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalWordCount.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalWordCount.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalWordCount.java
new file mode 100644
index 0000000..dbe81a7
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalWordCount.java
@@ -0,0 +1,96 @@
+package ${package};
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.tez.client.LocalTezEnvironment;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram
+ * over some sample data
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a simple Flink program.
+ * <li>use Tuple data types.
+ * <li>write and use user-defined functions.
+ * </ul>
+ *
+ */
+public class LocalWordCount {
+
+ //
+ // Program
+ //
+
+ public static void main(String[] args) throws Exception {
+
+ // set up the execution environment
+ final LocalTezEnvironment env = LocalTezEnvironment.create();
+
+ // get input data
+ DataSet<String> text = env.fromElements(
+ "To be, or not to be,--that is the question:--",
+ "Whether 'tis nobler in the mind to suffer",
+ "The slings and arrows of outrageous fortune",
+ "Or to take arms against a sea of troubles,"
+ );
+
+ DataSet<Tuple2<String, Integer>> counts =
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ text.flatMap(new LineSplitter())
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0)
+ .sum(1);
+
+ // emit result
+ counts.print();
+
+ env.execute("WordCount Example");
+ }
+
+ //
+ // User Functions
+ //
+
+ /**
+ * Implements the string tokenizer that splits sentences into words as a user-defined
+ * FlatMapFunction. The function takes a line (String) and splits it into
+ * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+ */
+ public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+ // normalize and split the line
+ String[] tokens = value.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
new file mode 100644
index 0000000..51627d5
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
@@ -0,0 +1,75 @@
+package ${package};
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.tez.client.RemoteTezEnvironment;
+
+/**
+ * Skeleton for a Flink on Tez program running on Yarn.
+ *
+ * For a full example of a Flink on Tez program, see the WordCountJob.java file in the
+ * same package/directory or have a look at the website.
+ *
+ * You can also generate a .jar file that you can submit on your Flink
+ * cluster.
+ * Just type
+ * mvn clean package
+ * in the projects root directory.
+ * You will find the jar in
+ * target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
+ *
+ */
+public class YarnJob {
+
+ public static void main(String[] args) throws Exception {
+ // set up the execution environment
+
+ // To use Tez YARN execution, use
+ final RemoteTezEnvironment env = RemoteTezEnvironment.create();
+ env.setDegreeOfParallelism(8);
+
+ /**
+ * Here, you can start creating your execution plan for Flink.
+ *
+ * Start with getting some data from the environment, like
+ * env.readTextFile(textPath);
+ *
+ * then, transform the resulting DataSet<String> using operations
+ * like
+ * .filter()
+ * .flatMap()
+ * .join()
+ * .coGroup()
+ * and many more.
+ * Have a look at the programming guide for the Java API:
+ *
+ * http://flink.apache.org/docs/latest/programming_guide.html
+ *
+ * and the examples
+ *
+ * http://flink.apache.org/docs/latest/examples.html
+ *
+ */
+
+
+ // execute program
+ env.registerMainClass(YarnJob.class);
+ env.execute("Flink Java API Skeleton");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java
new file mode 100644
index 0000000..e97dc0b
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnWordCount.java
@@ -0,0 +1,124 @@
+package ${package};
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ import org.apache.flink.api.java.DataSet;
+ import org.apache.flink.tez.client.RemoteTezEnvironment;
+ import org.apache.flink.api.common.functions.FlatMapFunction;
+ import org.apache.flink.api.java.tuple.Tuple2;
+ import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence histogram
+ * over some sample data
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a simple Flink program.
+ * <li>use Tuple data types.
+ * <li>write and use user-defined functions.
+ * </ul>
+ *
+ */
+public class YarnWordCount {
+
+ //
+ // Program
+ //
+
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final RemoteTezEnvironment env = RemoteTezEnvironment.create();
+ env.setDegreeOfParallelism(parallelism);
+
+ // get input data
+ DataSet<String> text = env.readTextFile(textPath);
+
+ DataSet<Tuple2<String, Integer>> counts =
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ text.flatMap(new LineSplitter())
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0)
+ .sum(1);
+
+ // emit result
+ counts.writeAsCsv(outputPath, "\n", " ");
+
+ // execute program
+ env.registerMainClass (YarnWordCount.class);
+ env.execute("WordCount Example");
+ }
+
+ //
+ // User Functions
+ //
+
+ /**
+ * Implements the string tokenizer that splits sentences into words as a user-defined
+ * FlatMapFunction. The function takes a line (String) and splits it into
+ * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+ */
+ public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+ // normalize and split the line
+ String[] tokens = value.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ }
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+ private static int parallelism;
+
+ private static boolean parseParameters(String[] args) {
+
+ if(args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if(args.length == 3) {
+ textPath = args[0];
+ outputPath = args[1];
+ parallelism = Integer.parseInt(args[2]);
+ } else {
+ System.err.println("Usage: YarnWordCount <text path> <result path> <parallelism>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing WordCount example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from a file.");
+ System.out.println(" Usage: YarnWordCount <text path> <result path> <parallelism>");
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/test/resources/projects/testArtifact/archetype.properties
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/test/resources/projects/testArtifact/archetype.properties b/flink-quickstart/flink-tez-quickstart/src/test/resources/projects/testArtifact/archetype.properties
new file mode 100644
index 0000000..bfce480
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/test/resources/projects/testArtifact/archetype.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+groupId=org.apache.flink.archetypetest
+artifactId=testArtifact
+version=0.1
+package=org.apache.flink.archetypetest
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/flink-tez-quickstart/src/test/resources/projects/testArtifact/goal.txt
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/test/resources/projects/testArtifact/goal.txt b/flink-quickstart/flink-tez-quickstart/src/test/resources/projects/testArtifact/goal.txt
new file mode 100644
index 0000000..f8808ba
--- /dev/null
+++ b/flink-quickstart/flink-tez-quickstart/src/test/resources/projects/testArtifact/goal.txt
@@ -0,0 +1 @@
+compile
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-quickstart/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/pom.xml b/flink-quickstart/pom.xml
index 4f3b4b2..dd9621c 100644
--- a/flink-quickstart/pom.xml
+++ b/flink-quickstart/pom.xml
@@ -74,4 +74,14 @@ under the License.
</plugin>
</plugins>
</build>
+
+ <!-- See main pom.xml for explanation of profiles -->
+ <profiles>
+ <profile>
+ <id>include-tez</id>
+ <modules>
+ <module>flink-tez-quickstart</module>
+ </modules>
+ </profile>
+ </profiles>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
index 89cf98a..6c97097 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
@@ -24,6 +24,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -56,7 +57,7 @@ import org.apache.flink.util.InstantiationUtil;
/**
* Configuration class which stores all relevant parameters required to set up the Pact tasks.
*/
-public class TaskConfig {
+public class TaskConfig implements Serializable {
private static final String TASK_NAME = "taskname";
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/pom.xml b/flink-staging/flink-tez/pom.xml
new file mode 100644
index 0000000..51bc551
--- /dev/null
+++ b/flink-staging/flink-tez/pom.xml
@@ -0,0 +1,224 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-staging</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-tez</artifactId>
+ <name>flink-tez</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>${shading-artifact.name}</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>${shading-artifact.name}</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>${shading-artifact.name}</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-optimizer</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>${shading-artifact.name}</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java-examples</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>${shading-artifact.name}</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>${shading-artifact.name}</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-tests</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>${shading-artifact.name}</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ <version>${tez.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-common</artifactId>
+ <version>${tez.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-dag</artifactId>
+ <version>${tez.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-library</artifactId>
+ <version>${tez.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.4</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4.1</version>
+ <configuration>
+ <descriptors>
+ <descriptor>${basedir}/src/assembly/flink-fat-jar.xml</descriptor>
+ </descriptors>
+ <archive>
+ <manifest>
+ <mainClass>org.apache.flink.tez.examples.ExampleDriver</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <!--<id>assemble-all</id>-->
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/assembly/flink-fat-jar.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/assembly/flink-fat-jar.xml b/flink-staging/flink-tez/src/assembly/flink-fat-jar.xml
new file mode 100644
index 0000000..504761a
--- /dev/null
+++ b/flink-staging/flink-tez/src/assembly/flink-fat-jar.xml
@@ -0,0 +1,42 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <id>flink-fat-jar</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>/</outputDirectory>
+ <useProjectArtifact>true</useProjectArtifact>
+ <!--<excludes>
+ <exclude>org.apache.flink:*</exclude>
+ </excludes>-->
+ <useTransitiveFiltering>true</useTransitiveFiltering>
+ <unpack>true</unpack>
+ <scope>runtime</scope>
+ <excludes>
+ <exclude>com.google.guava:guava</exclude>
+ </excludes>
+ </dependencySet>
+ </dependencySets>
+</assembly>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
new file mode 100644
index 0000000..6d1b1c7
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.client;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+
+public class LocalTezEnvironment extends ExecutionEnvironment {
+
+ TezExecutor executor;
+ Optimizer compiler;
+
+ private LocalTezEnvironment() {
+ compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new Configuration());
+ executor = new TezExecutor(compiler, this.getParallelism());
+ }
+
+ public static LocalTezEnvironment create() {
+ return new LocalTezEnvironment();
+ }
+
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ TezConfiguration tezConf = new TezConfiguration();
+ tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+ tezConf.set("fs.defaultFS", "file:///");
+ tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
+ executor.setConfiguration(tezConf);
+ return executor.executePlan(createProgramPlan(jobName));
+ }
+
+ @Override
+ public String getExecutionPlan() throws Exception {
+ Plan p = createProgramPlan(null, false);
+ return executor.getOptimizerPlanAsJSON(p);
+ }
+
+ public void setAsContext() {
+ ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
+ @Override
+ public ExecutionEnvironment createExecutionEnvironment() {
+ return LocalTezEnvironment.this;
+ }
+ };
+ initializeContextEnvironment(factory);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
new file mode 100644
index 0000000..b155527
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ClassUtil;
+import org.apache.hadoop.util.ToolRunner;
+
+
+public class RemoteTezEnvironment extends ExecutionEnvironment {
+
+ private static final Log LOG = LogFactory.getLog(RemoteTezEnvironment.class);
+
+ private Optimizer compiler;
+ private TezExecutor executor;
+ private Path jarPath = null;
+
+
+ public void registerMainClass (Class mainClass) {
+ jarPath = new Path(ClassUtil.findContainingJar(mainClass));
+ LOG.info ("Registering main class " + mainClass.getName() + " contained in " + jarPath.toString());
+ }
+
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ TezExecutorTool tool = new TezExecutorTool(executor, createProgramPlan());
+ if (jarPath != null) {
+ tool.setJobJar(jarPath);
+ }
+ try {
+ int executionResult = ToolRunner.run(new Configuration(), tool, new String[]{jobName});
+ }
+ finally {
+ return new JobExecutionResult(null, -1, null);
+ }
+
+ }
+
+ @Override
+ public String getExecutionPlan() throws Exception {
+ Plan p = createProgramPlan(null, false);
+ return executor.getOptimizerPlanAsJSON(p);
+ }
+
+ public static RemoteTezEnvironment create () {
+ return new RemoteTezEnvironment();
+ }
+
+ public RemoteTezEnvironment() {
+ compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new org.apache.flink.configuration.Configuration());
+ executor = new TezExecutor(compiler, this.getDegreeOfParallelism());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
new file mode 100644
index 0000000..a54724f
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.PlanExecutor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.tez.dag.TezDAGGenerator;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+public class TezExecutor extends PlanExecutor {
+
+ private static final Log LOG = LogFactory.getLog(TezExecutor.class);
+
+ private TezConfiguration tezConf;
+ private Optimizer compiler;
+
+ private Path jarPath;
+
+ private long runTime = -1; //TODO get DAG execution time from Tez
+ private int parallelism;
+
+ public TezExecutor(TezConfiguration tezConf, Optimizer compiler, int parallelism) {
+ this.tezConf = tezConf;
+ this.compiler = compiler;
+ this.parallelism = parallelism;
+ }
+
+ public TezExecutor(Optimizer compiler, int parallelism) {
+ this.tezConf = null;
+ this.compiler = compiler;
+ this.parallelism = parallelism;
+ }
+
+ public void setConfiguration (TezConfiguration tezConf) {
+ this.tezConf = tezConf;
+ }
+
+ private JobExecutionResult executePlanWithConf (TezConfiguration tezConf, Plan plan) throws Exception {
+
+ String jobName = plan.getJobName();
+
+ TezClient tezClient = TezClient.create(jobName, tezConf);
+ tezClient.start();
+ try {
+ OptimizedPlan optPlan = getOptimizedPlan(plan, parallelism);
+ TezDAGGenerator dagGenerator = new TezDAGGenerator(tezConf, new Configuration());
+ DAG dag = dagGenerator.createDAG(optPlan);
+
+ if (jarPath != null) {
+ addLocalResource(tezConf, jarPath, dag);
+ }
+
+ tezClient.waitTillReady();
+ LOG.info("Submitting DAG to Tez Client");
+ DAGClient dagClient = tezClient.submitDAG(dag);
+
+ LOG.info("Submitted DAG to Tez Client");
+
+ // monitoring
+ DAGStatus dagStatus = dagClient.waitForCompletion();
+
+ if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+ LOG.error (jobName + " failed with diagnostics: " + dagStatus.getDiagnostics());
+ throw new RuntimeException(jobName + " failed with diagnostics: " + dagStatus.getDiagnostics());
+ }
+ LOG.info(jobName + " finished successfully");
+
+ return new JobExecutionResult(null, runTime, null);
+
+ }
+ finally {
+ tezClient.stop();
+ }
+ }
+
+ @Override
+ public JobExecutionResult executePlan(Plan plan) throws Exception {
+ return executePlanWithConf(tezConf, plan);
+ }
+
+ private static void addLocalResource (TezConfiguration tezConf, Path jarPath, DAG dag) {
+
+ try {
+ org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get(tezConf);
+
+ LOG.info("Jar path received is " + jarPath.toString());
+
+ String jarFile = jarPath.getName();
+
+ Path remoteJarPath = null;
+
+ /*
+ if (tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR) == null) {
+ LOG.info("Tez staging directory is null, setting it.");
+ Path stagingDir = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString());
+ LOG.info("Setting Tez staging directory to " + stagingDir.toString());
+ tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
+ LOG.info("Set Tez staging directory to " + stagingDir.toString());
+ }
+ Path stagingDir = new Path(tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR));
+ LOG.info("Ensuring that Tez staging directory exists");
+ TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
+ LOG.info("Tez staging directory exists and is " + stagingDir.toString());
+ */
+
+
+ Path stagingDir = TezCommonUtils.getTezBaseStagingPath(tezConf);
+ LOG.info("Tez staging path is " + stagingDir);
+ TezClientUtils.ensureStagingDirExists(tezConf, stagingDir);
+ LOG.info("Tez staging dir exists");
+
+ remoteJarPath = fs.makeQualified(new Path(stagingDir, jarFile));
+ LOG.info("Copying " + jarPath.toString() + " to " + remoteJarPath.toString());
+ fs.copyFromLocalFile(jarPath, remoteJarPath);
+
+
+ FileStatus remoteJarStatus = fs.getFileStatus(remoteJarPath);
+ Credentials credentials = new Credentials();
+ TokenCache.obtainTokensForNamenodes(credentials, new Path[]{remoteJarPath}, tezConf);
+
+ Map<String, LocalResource> localResources = new TreeMap<String, LocalResource>();
+ LocalResource jobJar = LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromPath(remoteJarPath),
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
+ remoteJarStatus.getLen(), remoteJarStatus.getModificationTime());
+ localResources.put(jarFile.toString(), jobJar);
+
+ dag.addTaskLocalFiles(localResources);
+
+ LOG.info("Added job jar as local resource.");
+ }
+ catch (Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ System.exit(-1);
+ }
+ }
+
+ public void setJobJar (Path jarPath) {
+ this.jarPath = jarPath;
+ }
+
+
+ @Override
+ public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
+ OptimizedPlan optPlan = getOptimizedPlan(plan, parallelism);
+ PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
+ return jsonGen.getOptimizerPlanAsJSON(optPlan);
+ }
+
+ public OptimizedPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException {
+ if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
+ p.setDefaultParallelism(parallelism);
+ }
+ return this.compiler.compile(p);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2cbbd328/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
new file mode 100644
index 0000000..09289fb
--- /dev/null
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Tool;
+import org.apache.tez.dag.api.TezConfiguration;
+
+
+public class TezExecutorTool extends Configured implements Tool {
+
+ private static final Log LOG = LogFactory.getLog(TezExecutorTool.class);
+
+ private TezExecutor executor;
+ Plan plan;
+ private Path jarPath = null;
+
+ public TezExecutorTool(TezExecutor executor, Plan plan) {
+ this.executor = executor;
+ this.plan = plan;
+ }
+
+ public void setJobJar (Path jarPath) {
+ this.jarPath = jarPath;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ Configuration conf = getConf();
+
+ TezConfiguration tezConf;
+ if (conf != null) {
+ tezConf = new TezConfiguration(conf);
+ } else {
+ tezConf = new TezConfiguration();
+ }
+
+ UserGroupInformation.setConfiguration(tezConf);
+
+ executor.setConfiguration(tezConf);
+
+ try {
+ if (jarPath != null) {
+ executor.setJobJar(jarPath);
+ }
+ JobExecutionResult result = executor.executePlan(plan);
+ }
+ catch (Exception e) {
+ LOG.error("Job execution failed due to: " + e.getMessage());
+ throw new RuntimeException(e.getMessage());
+ }
+ return 0;
+ }
+
+
+}