You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2017/04/04 09:10:06 UTC

twill git commit: (TWILL-180) Reflects YARN application completion status via TwillController

Repository: twill
Updated Branches:
  refs/heads/master c310b6945 -> cc79f0d0b


(TWILL-180) Reflects YARN application completion status via TwillController

This closes #54 on Github.

Signed-off-by: Terence Yim <ch...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/cc79f0d0
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/cc79f0d0
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/cc79f0d0

Branch: refs/heads/master
Commit: cc79f0d0b23c7394dba146868f769291f782122f
Parents: c310b69
Author: Terence Yim <ch...@apache.org>
Authored: Mon Apr 3 23:38:58 2017 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Tue Apr 4 02:09:59 2017 -0700

----------------------------------------------------------------------
 .../org/apache/twill/api/ServiceController.java | 32 +++++++++++++++++++-
 .../AbstractExecutionServiceController.java     | 12 ++++++++
 .../twill/internal/TwillContainerLauncher.java  |  7 +++++
 .../apache/twill/yarn/YarnTwillController.java  | 15 +++++++--
 .../apache/twill/yarn/TaskCompletedTestRun.java | 28 ++++++++++++++++-
 5 files changed, 90 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/cc79f0d0/twill-api/src/main/java/org/apache/twill/api/ServiceController.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/ServiceController.java b/twill-api/src/main/java/org/apache/twill/api/ServiceController.java
index 1ea86b2..bb46290 100644
--- a/twill-api/src/main/java/org/apache/twill/api/ServiceController.java
+++ b/twill-api/src/main/java/org/apache/twill/api/ServiceController.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
 
 /**
  * This interface is for controlling a remote running service.
@@ -57,7 +58,8 @@ public interface ServiceController {
    * will be returned.
    *
    * @return a {@link Future} that represents the termination of the service. The future result will be
-   * this {@link ServiceController}. If the service terminated due to exception, the future will carry the exception.
+   * this {@link ServiceController}. If the service terminated with a {@link TerminationStatus#FAILED} status,
+   * calling the {@link Future#get()} on the returning future will throw {@link ExecutionException}.
    */
   Future<? extends ServiceController> terminate();
 
@@ -98,4 +100,32 @@ public interface ServiceController {
    * @throws ExecutionException if the service terminated due to exception.
    */
   void awaitTerminated(long timeout, TimeUnit timeoutUnit) throws TimeoutException, ExecutionException;
+
+  /**
+   * Gets the termination status of the application represented by this controller.
+   *
+   * @return the termination status or {@code null} if the application is still running
+   */
+  @Nullable
+  TerminationStatus getTerminationStatus();
+
+  /**
+   * Enum to represent termination status of the application when it completed.
+   */
+  enum TerminationStatus {
+    /**
+     * Application was completed successfully.
+     */
+    SUCCEEDED,
+
+    /**
+     * Application was killed explicitly.
+     */
+    KILLED,
+
+    /**
+     * Application failed.
+     */
+    FAILED
+  }
 }

http://git-wip-us.apache.org/repos/asf/twill/blob/cc79f0d0/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
index 580a88f..3ea27fc 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractExecutionServiceController.java
@@ -36,6 +36,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
 
 /**
  * An abstract base class for implementing {@link ServiceController} that deal with Service state transition and
@@ -47,6 +48,7 @@ public abstract class AbstractExecutionServiceController implements ServiceContr
   private final ListenerExecutors listenerExecutors;
   private final Service serviceDelegate;
   private final SettableFuture<State> terminationFuture;
+  private volatile TerminationStatus terminationStatus;
 
   protected AbstractExecutionServiceController(RunId runId) {
     this.runId = runId;
@@ -87,6 +89,12 @@ public abstract class AbstractExecutionServiceController implements ServiceContr
     });
   }
 
+  @Nullable
+  @Override
+  public TerminationStatus getTerminationStatus() {
+    return terminationStatus;
+  }
+
   @Override
   public void onRunning(final Runnable runnable, Executor executor) {
     addListener(new ServiceListenerAdapter() {
@@ -168,6 +176,10 @@ public abstract class AbstractExecutionServiceController implements ServiceContr
     };
   }
 
+  protected final void setTerminationStatus(TerminationStatus status) {
+    this.terminationStatus = status;
+  }
+
 
   private final class ServiceDelegate extends AbstractIdleService {
     @Override

http://git-wip-us.apache.org/repos/asf/twill/blob/cc79f0d0/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 9b6384c..0f8674b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -246,6 +246,13 @@ public final class TwillContainerLauncher {
     @Override
     public void completed(int exitStatus) {
       // count down the shutdownLatch to inform any waiting threads that this container is complete
+      if (exitStatus == 0) {
+        setTerminationStatus(TerminationStatus.SUCCEEDED);
+      } else if (exitStatus == 143) {
+        setTerminationStatus(TerminationStatus.KILLED);
+      } else {
+        setTerminationStatus(TerminationStatus.FAILED);
+      }
       shutdownLatch.countDown();
       synchronized (this) {
         forceShutDown();

http://git-wip-us.apache.org/repos/asf/twill/blob/cc79f0d0/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
index 6ea7d8f..335d7ec 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
@@ -166,13 +166,14 @@ final class YarnTwillController extends AbstractTwillController implements Twill
       kill();
     }
 
+    FinalApplicationStatus finalStatus;
     // Poll application status from yarn
     try (ProcessController<YarnApplicationReport> processController = this.processController) {
       Stopwatch stopWatch = new Stopwatch().start();
       long maxTime = TimeUnit.MILLISECONDS.convert(Constants.APPLICATION_MAX_STOP_SECONDS, TimeUnit.SECONDS);
 
       YarnApplicationReport report = processController.getReport();
-      FinalApplicationStatus finalStatus = report.getFinalApplicationStatus();
+      finalStatus = report.getFinalApplicationStatus();
       ApplicationId appId = report.getApplicationId();
       while (finalStatus == FinalApplicationStatus.UNDEFINED &&
           stopWatch.elapsedTime(TimeUnit.MILLISECONDS) < maxTime) {
@@ -180,18 +181,28 @@ final class YarnTwillController extends AbstractTwillController implements Twill
         TimeUnit.SECONDS.sleep(1);
         finalStatus = processController.getReport().getFinalApplicationStatus();
       }
-      LOG.debug("Yarn application {} {} completed with status {}", appName, appId, finalStatus);
 
       // Application not finished after max stop time, kill the application
       if (finalStatus == FinalApplicationStatus.UNDEFINED) {
         kill();
+        finalStatus = FinalApplicationStatus.KILLED;
       }
     } catch (Exception e) {
       LOG.warn("Exception while waiting for application report: {}", e.getMessage(), e);
       kill();
+      finalStatus = FinalApplicationStatus.KILLED;
     }
 
     super.doShutDown();
+
+    if (finalStatus == FinalApplicationStatus.FAILED) {
+      // If we know the app status is failed, throw an exception to make this controller goes into error state.
+      // All other final status are not treated as failure as we can't be sure.
+      setTerminationStatus(TerminationStatus.FAILED);
+      throw new RuntimeException(String.format("Yarn application completed with failure %s, %s.", appName, getRunId()));
+    }
+    setTerminationStatus(finalStatus == FinalApplicationStatus.SUCCEEDED
+                           ? TerminationStatus.SUCCEEDED : TerminationStatus.KILLED);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/twill/blob/cc79f0d0/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
index 51031d4..6fbdc2d 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TaskCompletedTestRun.java
@@ -20,6 +20,7 @@ package org.apache.twill.yarn;
 import com.google.common.base.Throwables;
 import org.apache.twill.api.AbstractTwillRunnable;
 import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.ServiceController;
 import org.apache.twill.api.TwillController;
 import org.apache.twill.api.TwillRunner;
 import org.apache.twill.api.logging.PrinterLogHandler;
@@ -82,7 +83,32 @@ public final class TaskCompletedTestRun extends BaseYarnTest {
 
     Assert.assertTrue(runLatch.await(1, TimeUnit.MINUTES));
     controller.awaitTerminated(1, TimeUnit.MINUTES);
+    Assert.assertEquals(ServiceController.TerminationStatus.SUCCEEDED, controller.getTerminationStatus());
+  }
+
+  @Test
+  public void testFailureComplete() throws TimeoutException, ExecutionException, InterruptedException {
+    TwillRunner twillRunner = getTwillRunner();
+
+    // Start the app with an invalid ClassLoader. This will cause the AM fails to start.
+    TwillController controller = twillRunner.prepare(new SleepTask(),
+                                                     ResourceSpecification.Builder.with()
+                                                       .setVirtualCores(1)
+                                                       .setMemory(512, ResourceSpecification.SizeUnit.MEGA)
+                                                       .setInstances(1).build())
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      .setClassLoader("InvalidClassLoader")
+      .start();
+
+    final CountDownLatch terminateLatch = new CountDownLatch(1);
+    controller.onTerminated(new Runnable() {
+      @Override
+      public void run() {
+        terminateLatch.countDown();
+      }
+    }, Threads.SAME_THREAD_EXECUTOR);
 
-    TimeUnit.SECONDS.sleep(2);
+    Assert.assertTrue(terminateLatch.await(2, TimeUnit.MINUTES));
+    Assert.assertEquals(ServiceController.TerminationStatus.FAILED, controller.getTerminationStatus());
   }
 }