You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:07 UTC

[34/51] [partial] incubator-reef git commit: [REEF-93] Move java sources to lang/java

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
new file mode 100644
index 0000000..8084068
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.reef.runtime.common.Launcher;
+import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath;
+import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
+import org.apache.reef.runtime.common.launch.parameters.LaunchID;
+import org.apache.reef.util.EnvironmentUtils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public final class JavaLaunchCommandBuilder implements LaunchCommandBuilder {
+  private static final String DEFAULT_JAVA_PATH = System.getenv("JAVA_HOME") + "/bin/" + "java";
+  private String stderrPath = null;
+  private String stdoutPath = null;
+  private String errorHandlerRID = null;
+  private String launchID = null;
+  private int megaBytes = 0;
+  private String evaluatorConfigurationPath = null;
+  private String javaPath = null;
+  private String classPath = null;
+  private Boolean assertionsEnabled = null;
+
+  @Override
+  public List<String> build() {
+    return new ArrayList<String>() {{
+
+      if (javaPath == null || javaPath.isEmpty()) {
+        add(DEFAULT_JAVA_PATH);
+      } else {
+        add(javaPath);
+      }
+
+      add("-XX:PermSize=128m");
+      add("-XX:MaxPermSize=128m");
+      // Set Xmx based on am memory size
+      add("-Xmx" + megaBytes + "m");
+
+      if ((assertionsEnabled != null && assertionsEnabled)
+          || EnvironmentUtils.areAssertionsEnabled()) {
+        add("-ea");
+      }
+
+      if (classPath != null && !classPath.isEmpty()) {
+        add("-classpath");
+        add(classPath);
+      }
+
+      Launcher.propagateProperties(this, true, "proc_reef");
+      Launcher.propagateProperties(this, false,
+          "java.util.logging.config.file", "java.util.logging.config.class");
+
+      add(Launcher.class.getName());
+
+      add("-" + ErrorHandlerRID.SHORT_NAME);
+      add(errorHandlerRID);
+      add("-" + LaunchID.SHORT_NAME);
+      add(launchID);
+      add("-" + ClockConfigurationPath.SHORT_NAME);
+      add(evaluatorConfigurationPath);
+
+      if (stdoutPath != null && !stdoutPath.isEmpty()) {
+        add("1>");
+        add(stdoutPath);
+      }
+
+      if (stderrPath != null && !stderrPath.isEmpty()) {
+        add("2>");
+        add(stderrPath);
+      }
+    }};
+  }
+
+  @Override
+  public JavaLaunchCommandBuilder setErrorHandlerRID(final String errorHandlerRID) {
+    this.errorHandlerRID = errorHandlerRID;
+    return this;
+  }
+
+  @Override
+  public JavaLaunchCommandBuilder setLaunchID(final String launchID) {
+    this.launchID = launchID;
+    return this;
+  }
+
+  @Override
+  public JavaLaunchCommandBuilder setMemory(final int megaBytes) {
+    this.megaBytes = megaBytes;
+    return this;
+  }
+
+  @Override
+  public JavaLaunchCommandBuilder setConfigurationFileName(final String configurationFileName) {
+    this.evaluatorConfigurationPath = configurationFileName;
+    return this;
+  }
+
+  @Override
+  public JavaLaunchCommandBuilder setStandardOut(final String standardOut) {
+    this.stdoutPath = standardOut;
+    return this;
+  }
+
+  @Override
+  public JavaLaunchCommandBuilder setStandardErr(final String standardErr) {
+    this.stderrPath = standardErr;
+    return this;
+  }
+
+  /**
+   * Set the path to the java executable. Will default to a heuristic search if not set.
+   *
+   * @param path
+   * @return this
+   */
+  public JavaLaunchCommandBuilder setJavaPath(final String path) {
+    this.javaPath = path;
+    return this;
+  }
+
+  public JavaLaunchCommandBuilder setClassPath(final String classPath) {
+    this.classPath = classPath;
+    return this;
+  }
+
+  public JavaLaunchCommandBuilder setClassPath(final Collection<String> classPathElements) {
+    this.classPath = StringUtils.join(classPathElements, File.pathSeparatorChar);
+    return this;
+  }
+
+  /**
+   * Enable or disable assertions on the child process.
+   * If not set, the setting is taken from the JVM that executes the code.
+   *
+   * @param assertionsEnabled
+   * @return this
+   */
+  public JavaLaunchCommandBuilder enableAssertions(final boolean assertionsEnabled) {
+    this.assertionsEnabled = assertionsEnabled;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchClass.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchClass.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchClass.java
new file mode 100644
index 0000000..8c40439
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchClass.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
+import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath;
+import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
+import org.apache.reef.runtime.common.launch.parameters.LaunchID;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.*;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.REEFVersion;
+import org.apache.reef.wake.profiler.WakeProfiler;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+import org.apache.reef.wake.time.Clock;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This encapsulates processes started by REEF.
+ */
+public final class LaunchClass implements AutoCloseable, Runnable {
+
+  private static final Logger LOG = Logger.getLogger(LaunchClass.class.getName());
+  private final RemoteManager remoteManager;
+  private final String launchID;
+  private final String errorHandlerID;
+  private final String evaluatorConfigurationPath;
+  private final boolean isProfilingEnabled;
+  private final REEFErrorHandler errorHandler;
+  private final ConfigurationSerializer configurationSerializer;
+  private WakeProfiler profiler;
+
+  @Inject
+  LaunchClass(final RemoteManager remoteManager,
+              final REEFUncaughtExceptionHandler uncaughtExceptionHandler,
+              final REEFErrorHandler errorHandler,
+              final @Parameter(LaunchID.class) String launchID,
+              final @Parameter(ErrorHandlerRID.class) String errorHandlerID,
+              final @Parameter(ClockConfigurationPath.class) String evaluatorConfigurationPath,
+              final @Parameter(ProfilingEnabled.class) boolean enableProfiling,
+              final ConfigurationSerializer configurationSerializer,
+              final REEFVersion reefVersion) {
+    reefVersion.logVersion();
+    this.remoteManager = remoteManager;
+    this.launchID = launchID;
+    this.errorHandlerID = errorHandlerID;
+    this.evaluatorConfigurationPath = evaluatorConfigurationPath;
+    this.isProfilingEnabled = enableProfiling;
+    this.errorHandler = errorHandler;
+    this.configurationSerializer = configurationSerializer;
+
+
+    // Registering a default exception handler. It sends every exception to the upstream RemoteManager
+    Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
+
+
+    if (isProfilingEnabled) {
+      this.profiler = new WakeProfiler();
+      ProfilingStopHandler.setProfiler(profiler); // TODO: This probably should be bound via Tang.
+    }
+  }
+
+  /**
+   * Loads the client and resource manager configuration files from disk.
+   */
+  private Configuration getClockConfiguration() {
+    return Configurations.merge(readConfigurationFromDisk(), getStaticClockConfiguration());
+  }
+
+
+  private Configuration readConfigurationFromDisk() {
+    LOG.log(Level.FINEST, "Loading configuration file: {0}", this.evaluatorConfigurationPath);
+
+    final File evaluatorConfigFile = new File(this.evaluatorConfigurationPath);
+
+    if (!evaluatorConfigFile.exists()) {
+      final String message = "The configuration file " + this.evaluatorConfigurationPath +
+          "doesn't exist. This points to an issue in the job submission.";
+      fail(message, new FileNotFoundException());
+      throw new RuntimeException(message);
+    } else if (!evaluatorConfigFile.canRead()) {
+      final String message = "The configuration file " + this.evaluatorConfigurationPath + " exists, but can't be read";
+      fail(message, new IOException());
+      throw new RuntimeException(message);
+    } else {
+      try {
+        return this.configurationSerializer.fromFile(evaluatorConfigFile);
+      } catch (final IOException e) {
+        final String message = "Unable to parse the configuration file " + this.evaluatorConfigurationPath;
+        fail(message, e);
+        throw new RuntimeException(message, e);
+      }
+    }
+  }
+
+  /**
+   * @return the part of the clock configuration *not* read from disk.
+   */
+  private Configuration getStaticClockConfiguration() {
+    final JavaConfigurationBuilder builder = Tang.Factory.getTang().newConfigurationBuilder()
+        .bindNamedParameter(LaunchID.class, this.launchID)
+        .bindNamedParameter(ErrorHandlerRID.class, this.errorHandlerID)
+        .bindSetEntry(Clock.StartHandler.class, PIDStoreStartHandler.class)
+        .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
+        .bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_LAUNCHER")
+        .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class);
+    if (this.isProfilingEnabled) {
+      builder.bindSetEntry(Clock.StopHandler.class, ProfilingStopHandler.class);
+    }
+    return builder.build();
+  }
+
+  /**
+   * Instantiates the clock.
+   *
+   * @return a clock object.
+   */
+  private Clock getClock() {
+    try {
+      final Injector clockInjector = Tang.Factory.getTang().newInjector(this.getClockConfiguration());
+      if (isProfilingEnabled) {
+        clockInjector.bindAspect(profiler);
+      }
+      clockInjector.bindVolatileInstance(RemoteManager.class, this.remoteManager);
+      return clockInjector.getInstance(Clock.class);
+    } catch (final Throwable ex) {
+      fail("Unable to instantiate the clock", ex);
+      throw new RuntimeException("Unable to instantiate the clock", ex);
+    }
+  }
+
+  /**
+   * Starts the Clock.
+   * This blocks until the clock returns.
+   */
+  @Override
+  public void run() {
+    LOG.entering(this.getClass().getName(), "run", "Starting the clock");
+    try {
+      this.getClock().run();
+    } catch (final Throwable t) {
+      fail("Fatal exception while executing the clock", t);
+    }
+    LOG.exiting(this.getClass().getName(), "run", "Clock terminated");
+  }
+
+  /**
+   * Closes the remote manager managed by this class.
+   *
+   * @throws Exception
+   */
+  @Override
+  public void close() throws Exception {
+    LOG.entering(this.getClass().getName(), "close");
+    this.errorHandler.close(); // Also closes the remoteManager
+    LOG.exiting(this.getClass().getName(), "close");
+  }
+
+  private void fail(final String message, final Throwable throwable) {
+    this.errorHandler.onNext(new Exception(message, throwable));
+  }
+
+  @NamedParameter(doc = "If true, profiling will be enabled", short_name = "profiling", default_value = "false")
+  public final static class ProfilingEnabled implements Name<Boolean> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java
new file mode 100644
index 0000000..6874b83
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+import java.util.List;
+
+/**
+ * Used to build the launch command for REEF processes.
+ */
+public interface LaunchCommandBuilder {
+
+
+  /**
+   * @return the launch command line
+   */
+  public List<String> build();
+
+  public LaunchCommandBuilder setErrorHandlerRID(final String errorHandlerRID);
+
+  public LaunchCommandBuilder setLaunchID(final String launchID);
+
+  public LaunchCommandBuilder setMemory(final int megaBytes);
+
+  /**
+   * Set the name of the configuration file for the Launcher. This file is assumed to exist in the working directory of
+   * the process launched with this command line.
+   *
+   * @param configurationFileName
+   * @return this
+   */
+  public LaunchCommandBuilder setConfigurationFileName(final String configurationFileName);
+
+  /**
+   * Names a file to which stdout will be redirected.
+   *
+   * @param standardOut
+   * @return this
+   */
+  public LaunchCommandBuilder setStandardOut(final String standardOut);
+
+  /**
+   * Names a file to which stderr will be redirected.
+   *
+   * @param standardErr
+   * @return this
+   */
+  public LaunchCommandBuilder setStandardErr(final String standardErr);
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LauncherSingletons.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LauncherSingletons.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LauncherSingletons.java
new file mode 100644
index 0000000..51da6c0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LauncherSingletons.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProfilingStopHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProfilingStopHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProfilingStopHandler.java
new file mode 100644
index 0000000..3ddabb0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProfilingStopHandler.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+import org.apache.reef.runtime.common.launch.parameters.LaunchID;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.profiler.WakeProfiler;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.io.FileNotFoundException;
+import java.io.PrintWriter;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An EventHandler that writes out the profiler results.
+ */
+final class ProfilingStopHandler implements EventHandler<StopTime> {
+  private static final Logger LOG = Logger.getLogger(ProfilingStopHandler.class.getName());
+  private static WakeProfiler profiler;
+  private final String launchID;
+
+  @Inject
+  public ProfilingStopHandler(final @Parameter(LaunchID.class) String launchID) {
+    this.launchID = launchID;
+  }
+
+  static void setProfiler(final WakeProfiler profiler) {
+    ProfilingStopHandler.profiler = profiler;
+  }
+
+  @Override
+  public void onNext(final StopTime stopTime) {
+    try (final PrintWriter out = new PrintWriter("profile-" + launchID + ".json")) {
+      out.print(profiler.objectGraphToString());
+    } catch (final FileNotFoundException e) {
+      LOG.log(Level.WARNING, "Unable to write the profile", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java
new file mode 100644
index 0000000..544fcc6
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
+import org.apache.reef.runtime.common.launch.parameters.LaunchID;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+/**
+ * The error handler REEF registers with Wake.
+ */
+public final class REEFErrorHandler implements EventHandler<Throwable> {
+
+  private static final Logger LOG = Logger.getLogger(REEFErrorHandler.class.getName());
+
+  // This class is used as the ErrorHandler in the RemoteManager. Hence, we need an InjectionFuture here.
+  private final InjectionFuture<RemoteManager> remoteManager;
+  private final String launchID;
+  private final String errorHandlerRID;
+  private final ExceptionCodec exceptionCodec;
+
+  @Inject
+  REEFErrorHandler(final InjectionFuture<RemoteManager> remoteManager,
+                   final @Parameter(ErrorHandlerRID.class) String errorHandlerRID,
+                   final @Parameter(LaunchID.class) String launchID,
+                   final ExceptionCodec exceptionCodec) {
+    this.errorHandlerRID = errorHandlerRID;
+    this.remoteManager = remoteManager;
+    this.launchID = launchID;
+    this.exceptionCodec = exceptionCodec;
+  }
+
+  @Override
+  public void onNext(final Throwable e) {
+    LOG.log(Level.SEVERE, "Uncaught exception.", e);
+    // TODO: This gets a new EventHandler each time an exception is caught. It would be better to cache the handler. But
+    // that introduces threading issues and isn't really worth it, as the JVM typically will be killed once we catch an
+    // Exception in here.
+    if (!this.errorHandlerRID.equals(ErrorHandlerRID.NONE)) {
+      final EventHandler<ReefServiceProtos.RuntimeErrorProto> runtimeErrorHandler = this.remoteManager.get()
+          .getHandler(errorHandlerRID, ReefServiceProtos.RuntimeErrorProto.class);
+      final ReefServiceProtos.RuntimeErrorProto message = ReefServiceProtos.RuntimeErrorProto.newBuilder()
+          .setName("reef")
+          .setIdentifier(launchID)
+          .setMessage(e.getMessage())
+          .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(e)))
+          .build();
+      try {
+        runtimeErrorHandler.onNext(message);
+      } catch (final Throwable t) {
+        LOG.log(Level.SEVERE, "Unable to send the error upstream", t);
+      }
+    } else {
+      LOG.log(Level.SEVERE, "Caught an exception from Wake we cannot send upstream because there is no upstream");
+    }
+  }
+
+  public void close() {
+    try {
+      this.remoteManager.get().close();
+    } catch (final Throwable ex) {
+      LOG.log(Level.SEVERE, "Unable to close the remote manager", ex);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "REEFErrorHandler{" +
+        "remoteManager=" + remoteManager +
+        ", launchID='" + launchID + '\'' +
+        ", errorHandlerRID='" + errorHandlerRID + '\'' +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
new file mode 100644
index 0000000..8a81184
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.proto.REEFProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.wake.remote.Codec;
+
+import javax.inject.Inject;
+
+/**
+ * Codec for REEF's control flow messages.
+ */
+@Private
+@Provided
+@ClientSide
+@DriverSide
+@EvaluatorSide
+public final class REEFMessageCodec implements Codec<GeneratedMessage> {
+
+
+  @Inject
+  private REEFMessageCodec() {
+  }
+
+  @Override
+  public GeneratedMessage decode(final byte[] bytes) {
+    try {
+      final REEFProtocol.REEFMessage message = REEFProtocol.REEFMessage.parseFrom(bytes);
+      if (message.hasJobSubmission()) {
+        return message.getJobSubmission();
+      } else if (message.hasJobControl()) {
+        return message.getJobControl();
+      } else if (message.hasRuntimeError()) {
+        return message.getRuntimeError();
+      } else if (message.hasJobStatus()) {
+        return message.getJobStatus();
+      } else if (message.hasEvaluatorControl()) {
+        return message.getEvaluatorControl();
+      } else if (message.hasEvaluatorHeartBeat()) {
+        return message.getEvaluatorHeartBeat();
+      }
+      throw new RuntimeException("Unable to decode a message: " + message.toString());
+    } catch (final InvalidProtocolBufferException e) {
+      throw new RuntimeException("Unable to decode a message", e);
+    }
+  }
+
+  @Override
+  public byte[] encode(final GeneratedMessage msg) {
+    final REEFProtocol.REEFMessage.Builder message = REEFProtocol.REEFMessage.newBuilder();
+
+    if (msg instanceof ClientRuntimeProtocol.JobSubmissionProto) {
+      message.setJobSubmission((ClientRuntimeProtocol.JobSubmissionProto) msg);
+    } else if (msg instanceof ClientRuntimeProtocol.JobControlProto) {
+      message.setJobControl((ClientRuntimeProtocol.JobControlProto) msg);
+    } else if (msg instanceof ReefServiceProtos.RuntimeErrorProto) {
+      message.setRuntimeError((ReefServiceProtos.RuntimeErrorProto) msg);
+    } else if (msg instanceof ReefServiceProtos.JobStatusProto) {
+      message.setJobStatus((ReefServiceProtos.JobStatusProto) msg);
+    } else if (msg instanceof EvaluatorRuntimeProtocol.EvaluatorControlProto) {
+      message.setEvaluatorControl((EvaluatorRuntimeProtocol.EvaluatorControlProto) msg);
+    } else if (msg instanceof EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto) {
+      message.setEvaluatorHeartBeat((EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto) msg);
+    } else {
+      throw new RuntimeException("Unable to serialize: " + msg);
+    }
+
+    return message.build().toByteArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java
new file mode 100644
index 0000000..5190f86
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This is used as the Exception handler for REEF client processes (Driver, Evaluator).
+ * <p/>
+ * It catches all exceptions and sends them to the controlling process.
+ * For Evaluators, that is the Driver. For the Driver, that is the Client.
+ * <p/>
+ * After sending the exception, this shuts down the JVM, as this JVM is then officially dead.
+ */
+final class REEFUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+  private static final Logger LOG = Logger.getLogger(REEFUncaughtExceptionHandler.class.getName());
+  private final REEFErrorHandler errorHandler;
+
+
+  /**
+   * @param errorHandler
+   */
+  @Inject
+  REEFUncaughtExceptionHandler(final REEFErrorHandler errorHandler) {
+    this.errorHandler = errorHandler;
+  }
+
+  @Override
+  public final synchronized void uncaughtException(final Thread thread, final Throwable throwable) {
+    final String msg = "Thread " + thread.getName() + " threw an uncaught exception.";
+    LOG.log(Level.SEVERE, msg, throwable);
+    this.errorHandler.onNext(new Exception(msg, throwable));
+    try {
+      this.wait(100); // TODO: Remove
+    } catch (final InterruptedException e) {
+
+    }
+    this.errorHandler.close();
+    LOG.log(Level.SEVERE, "System.exit(1)");
+    System.exit(1);
+  }
+
+  @Override
+  public String toString() {
+    return "REEFUncaughtExceptionHandler{" +
+        "errorHandler=" + errorHandler +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/package-info.java
new file mode 100644
index 0000000..fef18f4
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Common launch code between Driver and Evaluator.
+ */
+package org.apache.reef.runtime.common.launch;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/ClockConfigurationPath.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/ClockConfigurationPath.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/ClockConfigurationPath.java
new file mode 100644
index 0000000..3715774
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/ClockConfigurationPath.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The path to the clock configuration
+ */
+@NamedParameter(doc = "The path to process configuration.", short_name = ClockConfigurationPath.SHORT_NAME)
+public final class ClockConfigurationPath implements Name<String> {
+  public static final String SHORT_NAME = "runtime_configuration";
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/ErrorHandlerRID.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/ErrorHandlerRID.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/ErrorHandlerRID.java
new file mode 100644
index 0000000..0285ad0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/ErrorHandlerRID.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The error handler remote identifier.", short_name = ErrorHandlerRID.SHORT_NAME, default_value = ErrorHandlerRID.NONE)
+public final class ErrorHandlerRID implements Name<String> {
+  /**
+   * Indicates that no ErrorHandler is bound.
+   */
+  // TODO: Find all comparisons with this and see whether we can do something about them
+  public static final String NONE = "NO_ERROR_HANDLER_REMOTE_ID";
+
+  /**
+   * Short name for the command line and such.
+   */
+  public static final String SHORT_NAME = "error_handler_rid";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/LaunchID.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/LaunchID.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/LaunchID.java
new file mode 100644
index 0000000..7dc8546
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/LaunchID.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The launch identifier.", short_name = LaunchID.SHORT_NAME)
+public final class LaunchID implements Name<String> {
+  public static final String SHORT_NAME = "launch_id";
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/package-info.java
new file mode 100644
index 0000000..155c399
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Parameters of the Launcher.
+ */
+package org.apache.reef.runtime.common.launch.parameters;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/package-info.java
new file mode 100644
index 0000000..57a7e54
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/package-info.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * REEF implementation.
+ * This is referred to as the `common resourcemanager` that contains code
+ * common to implementations of REEF on all resource managers.
+ * The individual resource managers are implemented by providing implementations
+ * of the various interfaces presribed in sub-packages called `API`.
+ */
+package org.apache.reef.runtime.common;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/parameters/DeleteTempFiles.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/parameters/DeleteTempFiles.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/parameters/DeleteTempFiles.java
new file mode 100644
index 0000000..891d890
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/parameters/DeleteTempFiles.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Whether or not to delete the temporary files created during Driver and Evaluator submission.
+ */
+@NamedParameter(doc = "Whether or not to delete the temporary files created during Driver and Evaluator submission.", default_value = "true")
+public final class DeleteTempFiles implements Name<Boolean> {
+  private DeleteTempFiles() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/parameters/JVMHeapSlack.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/parameters/JVMHeapSlack.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/parameters/JVMHeapSlack.java
new file mode 100644
index 0000000..3caa66b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/parameters/JVMHeapSlack.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The fraction of the container memory NOT to use for the Java Heap.
+ */
+@NamedParameter(doc = "The fraction of the container memory NOT to use for the Java Heap.", default_value = "0.0")
+public final class JVMHeapSlack implements Name<Double> {
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/BroadCastEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/BroadCastEventHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/BroadCastEventHandler.java
new file mode 100644
index 0000000..49fe345
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/BroadCastEventHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.utils;
+
+import org.apache.reef.wake.EventHandler;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class BroadCastEventHandler<E> implements EventHandler<E> {
+  private final List<EventHandler<E>> handlers;
+
+  public BroadCastEventHandler(final Collection<EventHandler<E>> handlers) {
+    this.handlers = new ArrayList<>(handlers);
+  }
+
+  @Override
+  public void onNext(final E event) {
+    for (final EventHandler<E> handler : handlers)
+      handler.onNext(event);
+  }
+
+  public void addEventHandler(final EventHandler<E> eventHandler) {
+    this.handlers.add(eventHandler);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DefaultExceptionCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DefaultExceptionCodec.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DefaultExceptionCodec.java
new file mode 100644
index 0000000..1a80f96
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DefaultExceptionCodec.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.utils;
+
+import org.apache.commons.lang.SerializationException;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default implementation of ExceptionCodec that uses Java serialization as its implementation.
+ */
+@Private
+final class DefaultExceptionCodec implements ExceptionCodec {
+  private static final Logger LOG = Logger.getLogger(DefaultExceptionCodec.class.getName());
+
+  @Inject
+  DefaultExceptionCodec() {
+  }
+
+  @Override
+  public Optional<Throwable> fromBytes(final byte[] bytes) {
+    try {
+      return Optional.<Throwable>of((Throwable) SerializationUtils.deserialize(bytes));
+    } catch (SerializationException | IllegalArgumentException e) {
+      LOG.log(Level.FINE, "Unable to deserialize a Throwable.", e);
+      return Optional.empty();
+    }
+  }
+
+  @Override
+  public Optional<Throwable> fromBytes(final Optional<byte[]> bytes) {
+    if (bytes.isPresent()) {
+      return this.fromBytes(bytes.get());
+    } else {
+      return Optional.empty();
+    }
+  }
+
+  @Override
+  public byte[] toBytes(final Throwable throwable) {
+    return SerializationUtils.serialize(throwable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
new file mode 100644
index 0000000..64c32e8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.utils;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.tang.util.MonotonicHashMap;
+import org.apache.reef.util.ExceptionHandlingEventHandler;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.ThreadPoolStage;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Delayed event router that dispatches messages to the proper event handler by type.
+ * This class is used in EvaluatorManager to isolate user threads from REEF.
+ */
+@Private
+@DriverSide
+public final class DispatchingEStage implements AutoCloseable {
+
+  /**
+   * A map of event handlers, populated in the register() method.
+   */
+  private final Map<Class<?>, EventHandler<?>> handlers =
+      Collections.synchronizedMap(new MonotonicHashMap<Class<?>, EventHandler<?>>());
+  /**
+   * Exception handler, one for all event handlers.
+   */
+  private final EventHandler<Throwable> errorHandler;
+  /**
+   * Thread pool to process delayed event handler invocations.
+   */
+  private final ThreadPoolStage<DelayedOnNext> stage;
+
+  /**
+   * @param errorHandler used for exceptions thrown from the event handlers registered.
+   * @param numThreads   number of threads to allocate to dispatch events.
+   * @param stageName    the name to use for the underlying stage. It will be carried over to name the Thread(s) spawned.
+   */
+  public DispatchingEStage(final EventHandler<Throwable> errorHandler,
+                           final int numThreads,
+                           final String stageName) {
+    this.errorHandler = errorHandler;
+    this.stage = new ThreadPoolStage<>(stageName,
+        new EventHandler<DelayedOnNext>() {
+          @Override
+          public void onNext(final DelayedOnNext promise) {
+            promise.handler.onNext(promise.message);
+          }
+        }, numThreads
+    );
+
+  }
+
+  /**
+   * Constructs a DispatchingEStage that uses the Thread pool and ErrorHandler of another one.
+   *
+   * @param other
+   */
+  public DispatchingEStage(final DispatchingEStage other) {
+    this.errorHandler = other.errorHandler;
+    this.stage = other.stage;
+  }
+
+  /**
+   * Register a new event handler.
+   *
+   * @param type     Message type to process with this handler.
+   * @param handlers A set of handlers that process that type of message.
+   * @param <T>      Message type.
+   * @param <U>      Type of message that event handler supports. Must be a subclass of T.
+   */
+  public <T, U extends T> void register(final Class<T> type, final Set<EventHandler<U>> handlers) {
+    this.handlers.put(type, new ExceptionHandlingEventHandler<>(
+        new BroadCastEventHandler<>(handlers), this.errorHandler));
+  }
+
+  /**
+   * Dispatch a new message by type.
+   *
+   * @param type    Type of event handler - must match the register() call.
+   * @param message A message to process. Must be a subclass of T.
+   * @param <T>     Message type that event handler supports.
+   * @param <U>     input message type. Must be a subclass of T.
+   */
+  @SuppressWarnings("unchecked")
+  public <T, U extends T> void onNext(final Class<T> type, final U message) {
+    final EventHandler<T> handler = (EventHandler<T>) this.handlers.get(type);
+    this.stage.onNext(new DelayedOnNext(handler, message));
+  }
+
+  /**
+   * Return true if there are no messages queued or in processing, false otherwise.
+   */
+  public boolean isEmpty() {
+    return this.stage.getQueueLength() == 0;
+  }
+
+  /**
+   * Close the internal thread pool.
+   *
+   * @throws Exception forwarded from EStage.close() call.
+   */
+  @Override
+  public void close() throws Exception {
+    this.stage.close();
+  }
+
+  /**
+   * Delayed EventHandler.onNext() call.
+   * Contains a message object and EventHandler to process it.
+   */
+  private static final class DelayedOnNext {
+
+    public final EventHandler<Object> handler;
+    public final Object message;
+
+    @SuppressWarnings("unchecked")
+    public <T, U extends T> DelayedOnNext(final EventHandler<T> handler, final U message) {
+      this.handler = (EventHandler<Object>) handler;
+      this.message = message;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/ExceptionCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/ExceptionCodec.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/ExceptionCodec.java
new file mode 100644
index 0000000..37fa264
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/ExceptionCodec.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.utils;
+
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.util.Optional;
+
+/**
+ * (De-)serializes exceptions.
+ */
+@DefaultImplementation(DefaultExceptionCodec.class)
+public interface ExceptionCodec {
+
+  /**
+   * Deserializes a Throwable that has been serialized using toBytes().
+   *
+   * @param bytes
+   * @return the Throable or Optional.empty() if the deserialization fails.
+   */
+  public Optional<Throwable> fromBytes(final byte[] bytes);
+
+  /**
+   * @param bytes
+   * @return fromBytes(bytes.get()) or Optional.empty()
+   */
+  public Optional<Throwable> fromBytes(final Optional<byte[]> bytes);
+
+  /**
+   * @param throwable
+   * @return the serialized form of the given Throwable.
+   */
+  public byte[] toBytes(final Throwable throwable);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java
new file mode 100644
index 0000000..e17108e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.utils;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteIdentifierFactory;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class RemoteManager {
+
+  private static final Logger LOG = Logger.getLogger(RemoteManager.class.getName());
+
+  private final org.apache.reef.wake.remote.RemoteManager raw;
+  private final RemoteIdentifierFactory factory;
+
+  @Inject
+  public RemoteManager(final org.apache.reef.wake.remote.RemoteManager raw,
+                       final RemoteIdentifierFactory factory) {
+    this.raw = raw;
+    this.factory = factory;
+    LOG.log(Level.FINE, "Instantiated 'RemoteManager' with remoteId: {0}", this.getMyIdentifier());
+  }
+
+  public final org.apache.reef.wake.remote.RemoteManager raw() {
+    return this.raw;
+  }
+
+  public void close() throws Exception {
+    this.raw.close();
+  }
+
+  public <T> EventHandler<T> getHandler(
+      final String destinationIdentifier, final Class<? extends T> messageType) {
+    return this.raw.getHandler(factory.getNewInstance(destinationIdentifier), messageType);
+  }
+
+  public <T, U extends T> AutoCloseable registerHandler(
+      final String sourceIdentifier, final Class<U> messageType,
+      final EventHandler<T> theHandler) {
+    return this.raw.registerHandler(factory.getNewInstance(sourceIdentifier), messageType, theHandler);
+  }
+
+  public <T, U extends T> AutoCloseable registerHandler(
+      final Class<U> messageType, final EventHandler<RemoteMessage<T>> theHandler) {
+    return this.raw.registerHandler(messageType, theHandler);
+  }
+
+  public AutoCloseable registerErrorHandler(final EventHandler<Exception> theHandler) {
+    return this.raw.registerErrorHandler(theHandler);
+  }
+
+  public String getMyIdentifier() {
+    return this.raw.getMyIdentifier().toString();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/HeartBeatTriggerManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/HeartBeatTriggerManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/HeartBeatTriggerManager.java
new file mode 100644
index 0000000..7137420
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/HeartBeatTriggerManager.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.runtime.common.evaluator.HeartBeatManager;
+
+import javax.inject.Inject;
+
+/**
+ * Helper class for immediately sending heartbeat messages.
+ * This can be used together with TaskMessageSource to push urgent messages to the Driver.
+ * <p/>
+ * CAUTION: Do not overuse as the Driver can be saturated with heartbeats.
+ *
+ * @see https://issues.apache.org/jira/browse/REEF-33 for the ongoing discussion of alternatives to this design.
+ */
+@TaskSide
+@Public
+@Unstable
+public class HeartBeatTriggerManager {
+  private final HeartBeatManager heartBeatManager;
+
+  @Inject
+  HeartBeatTriggerManager(final HeartBeatManager heartBeatManager) {
+    this.heartBeatManager = heartBeatManager;
+  }
+
+  /**
+   * Immediately send a heartbeat message to the Driver.
+   */
+  public void triggerHeartBeat() {
+    this.heartBeatManager.sendHeartbeat();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/Task.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/Task.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/Task.java
new file mode 100644
index 0000000..b042558
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/Task.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task;
+
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.annotations.audience.TaskSide;
+
+import java.util.concurrent.Callable;
+
+/**
+ * The interface for Tasks.
+ * <p/>
+ * This interface is to be implemented for Tasks.
+ * <p/>
+ * The main entry point for a Task is the call() method inherited from
+ * {@link Callable}. The REEF Evaluator will call this method in order to run
+ * the Task. The byte[] returned by it will be pushed to the Job Driver.
+ */
+@TaskSide
+@Public
+public interface Task {
+
+  /**
+   * Called by the resourcemanager harness to execute the task.
+   *
+   * @param memento the memento objected passed down by the driver.
+   * @return the user defined return value
+   * @throws Exception whenever the Task encounters an unsolved issue.
+   *                   This Exception will be thrown at the Driver's event handler.
+   */
+  public byte[] call(final byte[] memento) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/TaskMessage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/TaskMessage.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/TaskMessage.java
new file mode 100644
index 0000000..60c8c33
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/TaskMessage.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task;
+
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.io.Message;
+
+/**
+ * A message sent from a Task to a Driver.
+ */
+@EvaluatorSide
+@Public
+@Provided
+public final class TaskMessage implements Message {
+
+  private final String messageSourceID;
+  private final byte[] theBytes;
+
+  private TaskMessage(final String messageSourceID, final byte[] theBytes) {
+    this.messageSourceID = messageSourceID;
+    this.theBytes = theBytes;
+  }
+
+  /**
+   * @param messageSourceID The message's sourceID. This will be accessible in the Driver for routing.
+   * @param theBytes        The actual content of the message, serialized into a byte[]
+   * @return a new TaskMessage with the given content.
+   */
+  public static TaskMessage from(final String messageSourceID, final byte[] theBytes) {
+    assert (theBytes != null && messageSourceID != null);
+    return new TaskMessage(messageSourceID, theBytes);
+  }
+
+  /**
+   * @return the message source identifier.
+   */
+  public String getMessageSourceID() {
+    return this.messageSourceID;
+  }
+
+  /**
+   * @return the message
+   */
+  @Override
+  public byte[] get() {
+    return this.theBytes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/TaskMessageSource.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/TaskMessageSource.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/TaskMessageSource.java
new file mode 100644
index 0000000..38077e0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/TaskMessageSource.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.util.Optional;
+
+/**
+ * Message source for control flow messages from a task to the Driver.
+ * <p/>
+ * The getMessage() method in an Implementation of this interface will be called by the Evaluator resourcemanager whenever it is
+ * about to communicate with the Driver anyway. Hence, this can be used for occasional status updates etc.
+ */
+@Public
+@EvaluatorSide
+public interface TaskMessageSource {
+
+  /**
+   * @return a message to be sent back to the Driver, or Optional.empty() if no message shall be sent at this time.
+   */
+  public Optional<TaskMessage> getMessage();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/events/CloseEvent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/events/CloseEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/CloseEvent.java
new file mode 100644
index 0000000..106bc5f
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/CloseEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task.events;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.util.Optional;
+
+/**
+ * Indicates that the driver called .close() on this Task.
+ */
+@TaskSide
+@Provided
+@Public
+public interface CloseEvent {
+
+  /**
+   * @return the message sent with the close call, if any.
+   */
+  public Optional<byte[]> get();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/events/DriverMessage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/events/DriverMessage.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/DriverMessage.java
new file mode 100644
index 0000000..dbfe025
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/DriverMessage.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task.events;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.util.Optional;
+
+/**
+ * Represents a message sent by the driver.
+ */
+@TaskSide
+@Public
+@Provided
+public interface DriverMessage {
+
+  /**
+   * @return the message sent by the driver.
+   */
+  public Optional<byte[]> get();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/events/SuspendEvent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/events/SuspendEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/SuspendEvent.java
new file mode 100644
index 0000000..f23f3bf
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/SuspendEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task.events;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.util.Optional;
+
+/**
+ * Event fired when the driver called suspend() on this task.
+ */
+@TaskSide
+@Provided
+@Public
+public interface SuspendEvent {
+
+  /**
+   * @return the message sent with the suspend call, if any.
+   */
+  public Optional<byte[]> get();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/events/TaskStart.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/events/TaskStart.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/TaskStart.java
new file mode 100644
index 0000000..1a7a38b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/TaskStart.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task.events;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.io.naming.Identifiable;
+
+/**
+ * Represents a TaskStart. Fired right before Task.call() is invoked.
+ */
+@EvaluatorSide
+@Public
+public interface TaskStart extends Identifiable {
+
+  /**
+   * @return task identifier.
+   */
+  @Override
+  String getId();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/events/TaskStop.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/events/TaskStop.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/TaskStop.java
new file mode 100644
index 0000000..3a77dbb
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/TaskStop.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task.events;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.io.naming.Identifiable;
+
+/**
+ * Represents a Task stop. Fired right after Task.call() returned.
+ */
+@EvaluatorSide
+@Public
+public interface TaskStop extends Identifiable {
+
+  /**
+   * @return the task identifier
+   */
+  public String getId();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/util/Builder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/util/Builder.java b/lang/java/reef-common/src/main/java/org/apache/reef/util/Builder.java
new file mode 100644
index 0000000..3cd0db1
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/util/Builder.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util;
+
+/**
+ * A basic Builder pattern interface.
+ *
+ * @param <T> The type of object to be built.
+ */
+public interface Builder<T> {
+
+  /**
+   * Builds a fresh instance of the object. This can be invoked several times,
+   * each of which return a new instance.
+   *
+   * @return a fresh instance of the object.
+   */
+  public T build();
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/util/CommandUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/util/CommandUtils.java b/lang/java/reef-common/src/main/java/org/apache/reef/util/CommandUtils.java
new file mode 100644
index 0000000..146a161
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/util/CommandUtils.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * run given command and return the result as string
+ */
+final public class CommandUtils {
+  /**
+   * Standard java logger.
+   */
+  private static final Logger LOG = Logger.getLogger(CommandUtils.class.getName());
+
+  public final static String runCommand(final String command) {
+    final StringBuilder sb = new StringBuilder();
+    try {
+      final String cmd = OSUtils.isWindows() ? "cmd.exe /c " + command : command;
+      final Process proc = Runtime.getRuntime().exec(cmd);
+
+      try (final BufferedReader input =
+               new BufferedReader(new InputStreamReader(proc.getInputStream()))) {
+        String line;
+        while ((line = input.readLine()) != null) {
+          sb.append(line).append('\n');
+        }
+      }
+    } catch (final IOException ex) {
+      LOG.log(Level.SEVERE, "Error in call: " + command, ex);
+      sb.append(ex);
+    }
+    return sb.toString();
+  }
+}