You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/30 21:34:19 UTC

git commit: TEZ-1620. Wait for application finish before stopping MiniTezCluster (Jeff Zhang via bikas)

Repository: tez
Updated Branches:
  refs/heads/master 14cf35b78 -> 08bafa987


TEZ-1620. Wait for application finish before stopping MiniTezCluster (Jeff Zhang via bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/08bafa98
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/08bafa98
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/08bafa98

Branch: refs/heads/master
Commit: 08bafa987a9ee468aa653ad3b3c171ce1322ea36
Parents: 14cf35b
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Sep 30 12:34:03 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Sep 30 12:34:03 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  7 ++++
 .../org/apache/tez/test/MiniTezCluster.java     | 43 ++++++++++++++++++++
 .../org/apache/tez/test/TestAMRecovery.java     |  1 -
 .../org/apache/tez/test/TestDAGRecovery.java    |  1 -
 .../org/apache/tez/test/TestDAGRecovery2.java   |  2 -
 .../java/org/apache/tez/test/TestTezJobs.java   |  5 ---
 6 files changed, 50 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/08bafa98/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f5d942d..618ae3b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,13 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+Release 0.5.2: Unreleased
+
+INCOMPATIBLE CHANGES
+
+ALL CHANGES:
+  TEZ-1620. Wait for application finish before stopping MiniTezCluster
+
 Release 0.5.1: Unreleased
 
 INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/tez/blob/08bafa98/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
index 43a781c..ff6c5cd 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
@@ -21,6 +21,9 @@ package org.apache.tez.test;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +38,9 @@ import org.apache.hadoop.mapred.ShuffleHandler;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
@@ -45,6 +51,9 @@ import org.apache.tez.dag.app.DAGAppMaster;
 import org.apache.tez.mapreduce.hadoop.MRConfig;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+
 /**
  * Configures and starts the Tez-specific components in the YARN cluster.
  *
@@ -195,6 +204,40 @@ public class MiniTezCluster extends MiniYARNCluster {
         + conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH));
   }
 
+  @Override
+  protected void serviceStop() throws Exception {
+    waitForAppsToFinish();
+    super.serviceStop();
+  }
+
+  private void waitForAppsToFinish() {
+    YarnClient yarnClient = YarnClient.createYarnClient(); 
+    yarnClient.init(getConfig());
+    yarnClient.start();
+    try {
+      while(true) {
+        List<ApplicationReport> appReports = yarnClient.getApplications();
+        Collection<ApplicationReport> unCompletedApps = Collections2.filter(appReports, new Predicate<ApplicationReport>(){
+          @Override
+          public boolean apply(ApplicationReport appReport) {
+            return EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
+            YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING)
+            .contains(appReport.getYarnApplicationState());
+          }
+        });
+        if (unCompletedApps.size()==0){
+          break;
+        }
+        LOG.info("wait for applications to finish in MiniTezCluster");
+        Thread.sleep(1000);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      yarnClient.stop();
+    }
+  }
+  
   public Path getConfigFilePath() {
     return confFilePath;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/08bafa98/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index e79d562..1d3799f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -123,7 +123,6 @@ public class TestAMRecovery {
         e.printStackTrace();
       }
     }
-    Thread.sleep(10000);
     if (miniTezCluster != null) {
       try {
         LOG.info("Stopping MiniTezCluster");

http://git-wip-us.apache.org/repos/asf/tez/blob/08bafa98/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
index 60c1efc..5db17c3 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery.java
@@ -105,7 +105,6 @@ public class TestDAGRecovery {
         e.printStackTrace();
       }
     }
-    Thread.sleep(10000);
     if (miniTezCluster != null) {
       try {
         LOG.info("Stopping MiniTezCluster");

http://git-wip-us.apache.org/repos/asf/tez/blob/08bafa98/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
index b7957a3..4975e0f 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDAGRecovery2.java
@@ -93,7 +93,6 @@ public class TestDAGRecovery2 {
         e.printStackTrace();
       }
     }
-    Thread.sleep(10000);
     if (miniTezCluster != null) {
       try {
         LOG.info("Stopping MiniTezCluster");
@@ -149,7 +148,6 @@ public class TestDAGRecovery2 {
       }
     }
     tezSession = null;
-    Thread.sleep(10000);
   }
 
   void runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {

http://git-wip-us.apache.org/repos/asf/tez/blob/08bafa98/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 56f62a4..484ca7e 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -101,11 +101,6 @@ public class TestTezJobs {
 
   @AfterClass
   public static void tearDown() {
-    try {
-      Thread.sleep(10000);
-    } catch (InterruptedException e) {
-      // Ignore
-    }
     if (mrrTezCluster != null) {
       mrrTezCluster.stop();
       mrrTezCluster = null;