You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/30 21:34:19 UTC
git commit: TEZ-1620. Wait for application finish before stopping
MiniTezCluster (Jeff Zhang via bikas)
Repository: tez
Updated Branches:
refs/heads/master 14cf35b78 -> 08bafa987
TEZ-1620. Wait for application finish before stopping MiniTezCluster (Jeff Zhang via bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/08bafa98
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/08bafa98
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/08bafa98
Branch: refs/heads/master
Commit: 08bafa987a9ee468aa653ad3b3c171ce1322ea36
Parents: 14cf35b
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Sep 30 12:34:03 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Sep 30 12:34:03 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 7 ++++
.../org/apache/tez/test/MiniTezCluster.java | 43 ++++++++++++++++++++
.../org/apache/tez/test/TestAMRecovery.java | 1 -
.../org/apache/tez/test/TestDAGRecovery.java | 1 -
.../org/apache/tez/test/TestDAGRecovery2.java | 2 -
.../java/org/apache/tez/test/TestTezJobs.java | 5 ---
6 files changed, 50 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/08bafa98/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f5d942d..618ae3b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,13 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+Release 0.5.2: Unreleased
+
+INCOMPATIBLE CHANGES
+
+ALL CHANGES:
+ TEZ-1620. Wait for application finish before stopping MiniTezCluster
+
Release 0.5.1: Unreleased
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/tez/blob/08bafa98/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
index 43a781c..ff6c5cd 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
@@ -21,6 +21,9 @@ package org.apache.tez.test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,6 +38,9 @@ import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
@@ -45,6 +51,9 @@ import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+
/**
* Configures and starts the Tez-specific components in the YARN cluster.
*
@@ -195,6 +204,40 @@ public class MiniTezCluster extends MiniYARNCluster {
+ conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH));
}
+ @Override
+ protected void serviceStop() throws Exception {
+ waitForAppsToFinish();
+ super.serviceStop();
+ }
+
+ private void waitForAppsToFinish() {
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(getConfig());
+ yarnClient.start();
+ try {
+ while(true) {
+ List<ApplicationReport> appReports = yarnClient.getApplications();
+ Collection<ApplicationReport> unCompletedApps = Collections2.filter(appReports, new Predicate<ApplicationReport>(){
+ @Override
+ public boolean apply(ApplicationReport appReport) {
+ return EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
+ YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING)
+ .contains(appReport.getYarnApplicationState());
+ }
+ });
+ if (unCompletedApps.size()==0){
+ break;
+ }
+ LOG.info("wait for applications to finish in MiniTezCluster");
+ Thread.sleep(1000);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ yarnClient.stop();
+ }
+ }
+
public Path getConfigFilePath() {
return confFilePath;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/08bafa98/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index e79d562..1d3799f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -123,7 +123,6 @@ public class TestAMRecovery {
e.printStackTrace();
}
}
- Thread.sleep(10000);
if (miniTezCluster != null) {
try {
LOG.info("Stopping MiniTezCluster");
http://git-wip-us.apache.org/repos/asf/tez/blob/08bafa98/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index 60c1efc..5db17c3 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -105,7 +105,6 @@ public class TestDAGRecovery {
e.printStackTrace();
}
}
- Thread.sleep(10000);
if (miniTezCluster != null) {
try {
LOG.info("Stopping MiniTezCluster");
http://git-wip-us.apache.org/repos/asf/tez/blob/08bafa98/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
index b7957a3..4975e0f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
@@ -93,7 +93,6 @@ public class TestDAGRecovery2 {
e.printStackTrace();
}
}
- Thread.sleep(10000);
if (miniTezCluster != null) {
try {
LOG.info("Stopping MiniTezCluster");
@@ -149,7 +148,6 @@ public class TestDAGRecovery2 {
}
}
tezSession = null;
- Thread.sleep(10000);
}
void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
http://git-wip-us.apache.org/repos/asf/tez/blob/08bafa98/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 56f62a4..484ca7e 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -101,11 +101,6 @@ public class TestTezJobs {
@AfterClass
public static void tearDown() {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- // Ignore
- }
if (mrrTezCluster != null) {
mrrTezCluster.stop();
mrrTezCluster = null;