You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/07/31 23:14:26 UTC

incubator-reef git commit: [REEF-544] Propagate exception to RuntimeStop in DriverStatusManager.onError

Repository: incubator-reef
Updated Branches:
  refs/heads/master 9ebba41b2 -> 86413669c


[REEF-544] Propagate exception to RuntimeStop in DriverStatusManager.onError

This addressed the issue by
  * Propagating exceptions to RuntimeStopHandlers by adding a Throwable
    parameter to Clock.onStop().
  * Adding an UncaughtREEFExceptionHandler to REEFLauncher.

JIRA:
  [REEF-544](https://issues.apache.org/jira/browse/REEF-544)

Pull Request:
  This closes #327


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/86413669
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/86413669
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/86413669

Branch: refs/heads/master
Commit: 86413669ce69195c8af9d92c82ed251432a6504c
Parents: 9ebba41
Author: Andrew Chung <af...@gmail.com>
Authored: Fri Jul 31 11:39:14 2015 -0700
Committer: Markus Weimer <we...@apache.org>
Committed: Fri Jul 31 14:13:00 2015 -0700

----------------------------------------------------------------------
 .../reef/runtime/common/REEFLauncher.java       |  3 ++
 .../common/driver/DriverStatusManager.java      |  2 +-
 .../launch/REEFUncaughtExceptionHandler.java    | 47 ++++++++++++++------
 .../yarn/driver/YarnContainerManager.java       |  2 +-
 .../java/org/apache/reef/wake/time/Clock.java   |  7 +++
 .../reef/wake/time/runtime/RuntimeClock.java    | 12 ++++-
 .../wake/time/runtime/event/RuntimeStop.java    |  7 ++-
 7 files changed, 59 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/86413669/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
index 41214b4..b5d4d15 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java
@@ -22,6 +22,7 @@ import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
 import org.apache.reef.runtime.common.launch.ProfilingStopHandler;
 import org.apache.reef.runtime.common.launch.REEFErrorHandler;
 import org.apache.reef.runtime.common.launch.REEFMessageCodec;
+import org.apache.reef.runtime.common.launch.REEFUncaughtExceptionHandler;
 import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath;
 import org.apache.reef.tang.*;
 import org.apache.reef.tang.annotations.Name;
@@ -162,6 +163,8 @@ public final class REEFLauncher {
     }
 
     final REEFLauncher launcher = getREEFLauncher(args[0]);
+
+    Thread.setDefaultUncaughtExceptionHandler(new REEFUncaughtExceptionHandler(launcher.clockConfig));
     launcher.logVersion();
 
     try (final Clock clock = launcher.getClockFromConfig()) {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/86413669/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
index 33f551f..0c913de 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
@@ -143,7 +143,7 @@ public final class DriverStatusManager {
     } else {
       LOG.log(Level.WARNING, "Shutting down the Driver with an exception: ", exception);
       this.shutdownCause = Optional.of(exception);
-      this.clock.stop();
+      this.clock.stop(exception);
       this.setStatus(DriverStatus.FAILING);
     }
     LOG.exiting(DriverStatusManager.class.getCanonicalName(), "onError", new Object[]{exception});

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/86413669/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java
index ef25689..2b19f33 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java
@@ -18,6 +18,10 @@
  */
 package org.apache.reef.runtime.common.launch;
 
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+
 import javax.inject.Inject;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -30,38 +34,53 @@ import java.util.logging.Logger;
  * <p/>
  * After sending the exception, this shuts down the JVM, as this JVM is then officially dead.
  */
-final class REEFUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+public final class REEFUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
   private static final Logger LOG = Logger.getLogger(REEFUncaughtExceptionHandler.class.getName());
-  private final REEFErrorHandler errorHandler;
+  private final Configuration errorHandlerConfig;
 
+  private REEFErrorHandler errorHandler;
 
   /**
-   * @param errorHandler
+   * @param errorHandlerConfig
    */
   @Inject
-  REEFUncaughtExceptionHandler(final REEFErrorHandler errorHandler) {
-    this.errorHandler = errorHandler;
+  public REEFUncaughtExceptionHandler(final Configuration errorHandlerConfig) {
+    this.errorHandlerConfig = errorHandlerConfig;
+    this.errorHandler = null;
   }
 
   @Override
   public synchronized void uncaughtException(final Thread thread, final Throwable throwable) {
+    if (this.errorHandler == null) {
+      try {
+        this.errorHandler = Tang.Factory.getTang().newInjector(this.errorHandlerConfig)
+            .getInstance(REEFErrorHandler.class);
+      } catch (InjectionException ie) {
+        LOG.log(Level.WARNING, "Unable to inject error handler.");
+      }
+    }
+
     final String msg = "Thread " + thread.getName() + " threw an uncaught exception.";
-    LOG.log(Level.SEVERE, msg, throwable);
-    this.errorHandler.onNext(new Exception(msg, throwable));
-    try {
-      this.wait(100);
-    } catch (final InterruptedException e) {
-      // try-catch block used to wait and give process a chance to setup communication with its parent
+
+    if (this.errorHandler != null) {
+      LOG.log(Level.SEVERE, msg, throwable);
+      this.errorHandler.onNext(new Exception(msg, throwable));
+      try {
+        this.wait(100);
+      } catch (final InterruptedException e) {
+        // try-catch block used to wait and give process a chance to setup communication with its parent
+      }
+      this.errorHandler.close();
     }
-    this.errorHandler.close();
-    LOG.log(Level.SEVERE, "System.exit(1)");
+
+    LOG.log(Level.SEVERE, msg + " System.exit(1)");
     System.exit(1);
   }
 
   @Override
   public String toString() {
     return "REEFUncaughtExceptionHandler{" +
-        "errorHandler=" + errorHandler +
+        "errorHandler=" + String.valueOf(this.errorHandler) +
         '}';
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/86413669/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
index 5a8eac1..0e7d0e7 100644
--- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
+++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
@@ -264,7 +264,7 @@ final class YarnContainerManager
     }
   }
 
-  void onStop(final Exception exception) {
+  void onStop(final Throwable exception) {
 
     LOG.log(Level.FINE, "Stop Runtime: RM status {0}", this.resourceManager.getServiceState());
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/86413669/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Clock.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Clock.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Clock.java
index 639ce02..fce024c 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Clock.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Clock.java
@@ -64,6 +64,13 @@ public interface Clock extends Runnable, AutoCloseable {
   void stop();
 
   /**
+   * This stops the clock immediately, without waiting for
+   * client alarms to finish. Stops with an exception that
+   * is propagated to RuntimeStopHandlers.
+   */
+  void stop(final Throwable exception);
+
+  /**
    * Clock is idle if it has no future Alarms set.
    *
    * @return true if idle, otherwise false

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/86413669/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
index 65c75ac..acd0c48 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
@@ -51,6 +51,7 @@ public final class RuntimeClock implements Clock {
   private final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler;
   private final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler;
 
+  private Throwable stoppedOnException;
   private boolean closed = false;
 
   @Inject
@@ -72,6 +73,8 @@ public final class RuntimeClock implements Clock {
     this.runtimeStopHandler = runtimeStopHandler;
     this.idleHandler = idleHandler;
 
+    this.stoppedOnException = null;
+
     LOG.log(Level.FINE, "RuntimeClock instantiated.");
   }
 
@@ -100,12 +103,20 @@ public final class RuntimeClock implements Clock {
 
   @Override
   public void stop() {
+    this.stop(null);
+  }
+
+  @Override
+  public void stop(final Throwable stopOnException) {
     LOG.entering(RuntimeClock.class.getCanonicalName(), "stop");
     synchronized (this.schedule) {
       this.schedule.clear();
       this.schedule.add(new StopTime(timer.getCurrent()));
       this.schedule.notifyAll();
       this.closed = true;
+      if (this.stoppedOnException != null) {
+        this.stoppedOnException = stopOnException;
+      }
     }
     LOG.exiting(RuntimeClock.class.getCanonicalName(), "stop");
   }
@@ -127,7 +138,6 @@ public final class RuntimeClock implements Clock {
     LOG.exiting(RuntimeClock.class.getCanonicalName(), "close");
   }
 
-
   /**
    * Finds an acceptable stop time, which is the
    * a time beyond that of any client alarm.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/86413669/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/RuntimeStop.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/RuntimeStop.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/RuntimeStop.java
index 8f8b28c..ac4534a 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/RuntimeStop.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/RuntimeStop.java
@@ -20,21 +20,20 @@ package org.apache.reef.wake.time.runtime.event;
 
 import org.apache.reef.wake.time.Time;
 
-
 public class RuntimeStop extends Time {
 
-  private final Exception exception;
+  private final Throwable exception;
 
   public RuntimeStop(final long timestamp) {
     this(timestamp, null);
   }
 
-  public RuntimeStop(final long timestamp, final Exception exception) {
+  public RuntimeStop(final long timestamp, final Throwable exception) {
     super(timestamp);
     this.exception = exception;
   }
 
-  public final Exception getException() {
+  public final Throwable getException() {
     return this.exception;
   }
 }