You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by ktzoumas <gi...@git.apache.org> on 2014/11/07 18:54:57 UTC

[GitHub] incubator-flink pull request: [FLINK-1219] Add support for Apache ...

GitHub user ktzoumas opened a pull request:

    https://github.com/apache/incubator-flink/pull/189

    [FLINK-1219] Add support for Apache Tez as execution backend

    This allows Flink programs to be executed as Tez DAGs.
    
    Up for comments, not good to merge yet.
    
    The commit to make hadoop2 the default profile will eventually be a separate commit and issue,

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ktzoumas/incubator-flink tez_support

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/189.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #189
    
----
commit 7181959eb80a831a612d3a51c81a1976e309c9ae
Author: Kostas Tzoumas <ko...@kostass-mbp.fritz.box>
Date:   2014-10-03T09:49:39Z

    [FLINK-1219] Added support for Apache Tez as execution backend

commit 436150db8265f501e33a99df13662b86d31a0021
Author: Kostas Tzoumas <ko...@gmail.com>
Date:   2014-11-07T17:51:20Z

    Changed POM files to make hadoop2 the default profile

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1219] Add support for Apache ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/189#discussion_r20072304
  
    --- Diff: flink-addons/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.tez.dag;
    +
    +
    +import org.apache.flink.tez.runtime.TezTaskConfig;
    +import org.apache.tez.dag.api.TezConfiguration;
    +import org.apache.tez.dag.api.Vertex;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.UUID;
    +
    +public abstract class FlinkVertex {
    +
    +	protected Vertex cached;
    +	private String taskName;
    +	private int parallelism;
    +	protected TezTaskConfig taskConfig;
    +
    +	// Tez-specific bookkeeping
    +	protected String uniqueName; //Unique name in DAG
    +	private Map<FlinkVertex,ArrayList<Integer>> inputPositions;
    +	private ArrayList<Integer> numberOfSubTasksInOutputs;
    +
    +	public TezTaskConfig getConfig() {
    +		return taskConfig;
    +	}
    +
    +	public FlinkVertex(String taskName, int parallelism, TezTaskConfig taskConfig) {
    +		this.cached = null;
    +		this.taskName = taskName;
    +		this.parallelism = parallelism;
    +		this.taskConfig = taskConfig;
    +		this.uniqueName = taskName + UUID.randomUUID().toString();
    +		this.inputPositions = new HashMap<FlinkVertex, ArrayList<Integer>>();
    +		this.numberOfSubTasksInOutputs = new ArrayList<Integer>();
    +	}
    +
    +	public int getParallelism () {
    +		return parallelism;
    +	}
    +
    +	public void setParallelism (int parallelism) {this.parallelism = parallelism;}
    --- End diff --
    
    Inconsistent getter-setter style. I think putting the setter in one line is uncommon in our code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1219] Add support for Apache ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/189#discussion_r20072130
  
    --- Diff: pom.xml ---
    @@ -71,7 +71,7 @@ under the License.
     		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     		<hadoop-one.version>1.2.1</hadoop-one.version>
    -		<hadoop-two.version>2.2.0</hadoop-two.version>
    +		<hadoop-two.version>2.4.0</hadoop-two.version>
    --- End diff --
    
    I would suggest to use `2.4.1` as the hadoop2 version, if we are going to Hadoop 2.4 as the default hadoop version.
    
    Is Hadoop 2.4.0 a strict requirement for the Tez support?
    I don't know if our users are affected by this version bump. (CDH 5.1 is packaging Hadoop 2.3.0)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1219] Add support for Apache ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/189#discussion_r20072194
  
    --- Diff: flink-addons/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezExecutionEnvironment.java ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.tez.client;
    +
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.Plan;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.compiler.DataStatistics;
    +import org.apache.flink.compiler.PactCompiler;
    +import org.apache.flink.compiler.costs.DefaultCostEstimator;
    +import org.apache.flink.compiler.plan.OptimizedPlan;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.tez.dag.TezDAGGenerator;
    +import org.apache.tez.client.TezClient;
    +import org.apache.tez.dag.api.DAG;
    +import org.apache.tez.dag.api.TezConfiguration;
    +import org.apache.tez.dag.api.client.DAGClient;
    +import org.apache.tez.dag.api.client.DAGStatus;
    +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
    +
    +public class LocalTezExecutionEnvironment extends ExecutionEnvironment{
    +
    +	TezConfiguration tezConf;
    +
    +	private LocalTezExecutionEnvironment() {
    +		this.tezConf = new TezConfiguration();
    +		tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
    +		tezConf.set("fs.defaultFS", "file:///");
    +		tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
    +	}
    +
    +	public static LocalTezExecutionEnvironment create() {
    +		return new LocalTezExecutionEnvironment();
    +	}
    +
    +	@Override
    +	public JobExecutionResult execute(String jobName) throws Exception {
    +		try {
    +			TezClient tezClient = TezClient.create(jobName, tezConf);
    +
    +			tezClient.start();
    +
    +			try {
    +				Plan p = createProgramPlan(jobName);
    +				PactCompiler compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator());
    +				OptimizedPlan plan = compiler.compile(p);
    +				TezDAGGenerator dagGenerator = new TezDAGGenerator(tezConf, new Configuration());
    +				DAG dag = dagGenerator.createDAG(plan);
    +
    +				tezClient.waitTillReady();
    +				System.out.println("Submitting DAG to Tez Client");
    +				DAGClient dagClient = tezClient.submitDAG(dag);
    +
    +				System.out.println("Submitted DAG to Tez Client");
    +
    +				// monitoring
    +				DAGStatus dagStatus = dagClient.waitForCompletion();
    +
    +				if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
    +					System.out.println(jobName + " failed with diagnostics: " + dagStatus.getDiagnostics());
    +					System.exit(1);
    +				}
    +				System.out.println(jobName + " finished successfully");
    +			} catch (Exception e) {
    +				e.printStackTrace();
    --- End diff --
    
    rethrow as a RuntimeException


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1219] Add support for Apache ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/189#discussion_r20072188
  
    --- Diff: flink-addons/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezExecutionEnvironment.java ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.tez.client;
    +
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.Plan;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.compiler.DataStatistics;
    +import org.apache.flink.compiler.PactCompiler;
    +import org.apache.flink.compiler.costs.DefaultCostEstimator;
    +import org.apache.flink.compiler.plan.OptimizedPlan;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.tez.dag.TezDAGGenerator;
    +import org.apache.tez.client.TezClient;
    +import org.apache.tez.dag.api.DAG;
    +import org.apache.tez.dag.api.TezConfiguration;
    +import org.apache.tez.dag.api.client.DAGClient;
    +import org.apache.tez.dag.api.client.DAGStatus;
    +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
    +
    +public class LocalTezExecutionEnvironment extends ExecutionEnvironment{
    +
    +	TezConfiguration tezConf;
    +
    +	private LocalTezExecutionEnvironment() {
    +		this.tezConf = new TezConfiguration();
    +		tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
    +		tezConf.set("fs.defaultFS", "file:///");
    +		tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
    +	}
    +
    +	public static LocalTezExecutionEnvironment create() {
    +		return new LocalTezExecutionEnvironment();
    +	}
    +
    +	@Override
    +	public JobExecutionResult execute(String jobName) throws Exception {
    +		try {
    +			TezClient tezClient = TezClient.create(jobName, tezConf);
    +
    +			tezClient.start();
    +
    +			try {
    +				Plan p = createProgramPlan(jobName);
    +				PactCompiler compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator());
    +				OptimizedPlan plan = compiler.compile(p);
    +				TezDAGGenerator dagGenerator = new TezDAGGenerator(tezConf, new Configuration());
    +				DAG dag = dagGenerator.createDAG(plan);
    +
    +				tezClient.waitTillReady();
    +				System.out.println("Submitting DAG to Tez Client");
    +				DAGClient dagClient = tezClient.submitDAG(dag);
    +
    +				System.out.println("Submitted DAG to Tez Client");
    --- End diff --
    
    I think its better to use a logging system rather than system.out for messages like this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1219] Add support for Apache ...

Posted by ktzoumas <gi...@git.apache.org>.
Github user ktzoumas commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/189#discussion_r21305376
  
    --- Diff: flink-addons/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java ---
    @@ -0,0 +1,201 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.tez.examples;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.FlatJoinFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.aggregation.Aggregations;
    +import org.apache.flink.api.java.functions.FunctionAnnotation;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
    +import org.apache.flink.util.Collector;
    +
    +
    +public class ConnectedComponentsStep implements ProgramDescription {
    +
    +	// *************************************************************************
    +	//     PROGRAM
    +	// *************************************************************************
    +
    +	public static void main(String... args) throws Exception {
    +
    +		if(!parseParameters(args)) {
    +			return;
    +		}
    +
    +		// set up execution environment
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    --- End diff --
    
    I left this as is so that the tests can create their environment and the example does not overwrite that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1219] Add support for Apache ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/189#discussion_r20072452
  
    --- Diff: flink-addons/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java ---
    @@ -0,0 +1,201 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.tez.examples;
    +
    +import org.apache.flink.api.common.ProgramDescription;
    +import org.apache.flink.api.common.functions.FlatJoinFunction;
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.JoinFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.aggregation.Aggregations;
    +import org.apache.flink.api.java.functions.FunctionAnnotation;
    +import org.apache.flink.api.java.tuple.Tuple1;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
    +import org.apache.flink.util.Collector;
    +
    +
    +public class ConnectedComponentsStep implements ProgramDescription {
    +
    +	// *************************************************************************
    +	//     PROGRAM
    +	// *************************************************************************
    +
    +	public static void main(String... args) throws Exception {
    +
    +		if(!parseParameters(args)) {
    +			return;
    +		}
    +
    +		// set up execution environment
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    --- End diff --
    
    TezExecutionEnv?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1219] Add support for Apache ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/189#discussion_r20072153
  
    --- Diff: pom.xml ---
    @@ -215,6 +215,13 @@ under the License.
     				<artifactId>stax-api</artifactId>
     				<version>1.0.1</version>
     			</dependency>
    +
    +            <!-- Commons codec required by Tez for flink-tez -->
    --- End diff --
    
    I think you can remove the dependency here. It is already specified in flink-tez.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: [FLINK-1219] Add support for Apache ...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/189#discussion_r20072219
  
    --- Diff: flink-addons/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezExecutionEnvironment.java ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.tez.client;
    +
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.Plan;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.compiler.DataStatistics;
    +import org.apache.flink.compiler.PactCompiler;
    +import org.apache.flink.compiler.costs.DefaultCostEstimator;
    +import org.apache.flink.compiler.plan.OptimizedPlan;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.tez.dag.TezDAGGenerator;
    +import org.apache.tez.client.TezClient;
    +import org.apache.tez.dag.api.DAG;
    +import org.apache.tez.dag.api.TezConfiguration;
    +import org.apache.tez.dag.api.client.DAGClient;
    +import org.apache.tez.dag.api.client.DAGStatus;
    +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
    +
    +public class LocalTezExecutionEnvironment extends ExecutionEnvironment{
    +
    +	TezConfiguration tezConf;
    +
    +	private LocalTezExecutionEnvironment() {
    +		this.tezConf = new TezConfiguration();
    +		tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
    +		tezConf.set("fs.defaultFS", "file:///");
    +		tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
    +	}
    +
    +	public static LocalTezExecutionEnvironment create() {
    +		return new LocalTezExecutionEnvironment();
    +	}
    +
    +	@Override
    +	public JobExecutionResult execute(String jobName) throws Exception {
    +		try {
    +			TezClient tezClient = TezClient.create(jobName, tezConf);
    +
    +			tezClient.start();
    +
    +			try {
    +				Plan p = createProgramPlan(jobName);
    +				PactCompiler compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator());
    +				OptimizedPlan plan = compiler.compile(p);
    +				TezDAGGenerator dagGenerator = new TezDAGGenerator(tezConf, new Configuration());
    +				DAG dag = dagGenerator.createDAG(plan);
    +
    +				tezClient.waitTillReady();
    +				System.out.println("Submitting DAG to Tez Client");
    +				DAGClient dagClient = tezClient.submitDAG(dag);
    +
    +				System.out.println("Submitted DAG to Tez Client");
    +
    +				// monitoring
    +				DAGStatus dagStatus = dagClient.waitForCompletion();
    +
    +				if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
    +					System.out.println(jobName + " failed with diagnostics: " + dagStatus.getDiagnostics());
    +					System.exit(1);
    +				}
    +				System.out.println(jobName + " finished successfully");
    +			} catch (Exception e) {
    +				e.printStackTrace();
    +			} finally {
    +				tezClient.stop();
    +			}
    +		}
    +		catch (Exception e) {
    +			e.printStackTrace();
    +			System.exit(1);
    --- End diff --
    
    We shouldn't kill the JVM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---