You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:07 UTC
[34/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
new file mode 100644
index 0000000..8084068
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/JavaLaunchCommandBuilder.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.reef.runtime.common.Launcher;
+import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath;
+import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
+import org.apache.reef.runtime.common.launch.parameters.LaunchID;
+import org.apache.reef.util.EnvironmentUtils;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public final class JavaLaunchCommandBuilder implements LaunchCommandBuilder {
+ private static final String DEFAULT_JAVA_PATH = System.getenv("JAVA_HOME") + "/bin/" + "java";
+ private String stderrPath = null;
+ private String stdoutPath = null;
+ private String errorHandlerRID = null;
+ private String launchID = null;
+ private int megaBytes = 0;
+ private String evaluatorConfigurationPath = null;
+ private String javaPath = null;
+ private String classPath = null;
+ private Boolean assertionsEnabled = null;
+
+ @Override
+ public List<String> build() {
+ return new ArrayList<String>() {{
+
+ if (javaPath == null || javaPath.isEmpty()) {
+ add(DEFAULT_JAVA_PATH);
+ } else {
+ add(javaPath);
+ }
+
+ add("-XX:PermSize=128m");
+ add("-XX:MaxPermSize=128m");
+ // Set Xmx based on am memory size
+ add("-Xmx" + megaBytes + "m");
+
+ if ((assertionsEnabled != null && assertionsEnabled)
+ || EnvironmentUtils.areAssertionsEnabled()) {
+ add("-ea");
+ }
+
+ if (classPath != null && !classPath.isEmpty()) {
+ add("-classpath");
+ add(classPath);
+ }
+
+ Launcher.propagateProperties(this, true, "proc_reef");
+ Launcher.propagateProperties(this, false,
+ "java.util.logging.config.file", "java.util.logging.config.class");
+
+ add(Launcher.class.getName());
+
+ add("-" + ErrorHandlerRID.SHORT_NAME);
+ add(errorHandlerRID);
+ add("-" + LaunchID.SHORT_NAME);
+ add(launchID);
+ add("-" + ClockConfigurationPath.SHORT_NAME);
+ add(evaluatorConfigurationPath);
+
+ if (stdoutPath != null && !stdoutPath.isEmpty()) {
+ add("1>");
+ add(stdoutPath);
+ }
+
+ if (stderrPath != null && !stderrPath.isEmpty()) {
+ add("2>");
+ add(stderrPath);
+ }
+ }};
+ }
+
+ @Override
+ public JavaLaunchCommandBuilder setErrorHandlerRID(final String errorHandlerRID) {
+ this.errorHandlerRID = errorHandlerRID;
+ return this;
+ }
+
+ @Override
+ public JavaLaunchCommandBuilder setLaunchID(final String launchID) {
+ this.launchID = launchID;
+ return this;
+ }
+
+ @Override
+ public JavaLaunchCommandBuilder setMemory(final int megaBytes) {
+ this.megaBytes = megaBytes;
+ return this;
+ }
+
+ @Override
+ public JavaLaunchCommandBuilder setConfigurationFileName(final String configurationFileName) {
+ this.evaluatorConfigurationPath = configurationFileName;
+ return this;
+ }
+
+ @Override
+ public JavaLaunchCommandBuilder setStandardOut(final String standardOut) {
+ this.stdoutPath = standardOut;
+ return this;
+ }
+
+ @Override
+ public JavaLaunchCommandBuilder setStandardErr(final String standardErr) {
+ this.stderrPath = standardErr;
+ return this;
+ }
+
+ /**
+ * Set the path to the java executable. Will default to a heuristic search if not set.
+ *
+ * @param path
+ * @return this
+ */
+ public JavaLaunchCommandBuilder setJavaPath(final String path) {
+ this.javaPath = path;
+ return this;
+ }
+
+ public JavaLaunchCommandBuilder setClassPath(final String classPath) {
+ this.classPath = classPath;
+ return this;
+ }
+
+ public JavaLaunchCommandBuilder setClassPath(final Collection<String> classPathElements) {
+ this.classPath = StringUtils.join(classPathElements, File.pathSeparatorChar);
+ return this;
+ }
+
+ /**
+ * Enable or disable assertions on the child process.
+ * If not set, the setting is taken from the JVM that executes the code.
+ *
+ * @param assertionsEnabled
+ * @return this
+ */
+ public JavaLaunchCommandBuilder enableAssertions(final boolean assertionsEnabled) {
+ this.assertionsEnabled = assertionsEnabled;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchClass.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchClass.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchClass.java
new file mode 100644
index 0000000..8c40439
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchClass.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler;
+import org.apache.reef.runtime.common.launch.parameters.ClockConfigurationPath;
+import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
+import org.apache.reef.runtime.common.launch.parameters.LaunchID;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.*;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.REEFVersion;
+import org.apache.reef.wake.profiler.WakeProfiler;
+import org.apache.reef.wake.remote.RemoteConfiguration;
+import org.apache.reef.wake.time.Clock;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This encapsulates processes started by REEF.
+ */
+public final class LaunchClass implements AutoCloseable, Runnable {
+
+ private static final Logger LOG = Logger.getLogger(LaunchClass.class.getName());
+ private final RemoteManager remoteManager;
+ private final String launchID;
+ private final String errorHandlerID;
+ private final String evaluatorConfigurationPath;
+ private final boolean isProfilingEnabled;
+ private final REEFErrorHandler errorHandler;
+ private final ConfigurationSerializer configurationSerializer;
+ private WakeProfiler profiler;
+
+ @Inject
+ LaunchClass(final RemoteManager remoteManager,
+ final REEFUncaughtExceptionHandler uncaughtExceptionHandler,
+ final REEFErrorHandler errorHandler,
+ final @Parameter(LaunchID.class) String launchID,
+ final @Parameter(ErrorHandlerRID.class) String errorHandlerID,
+ final @Parameter(ClockConfigurationPath.class) String evaluatorConfigurationPath,
+ final @Parameter(ProfilingEnabled.class) boolean enableProfiling,
+ final ConfigurationSerializer configurationSerializer,
+ final REEFVersion reefVersion) {
+ reefVersion.logVersion();
+ this.remoteManager = remoteManager;
+ this.launchID = launchID;
+ this.errorHandlerID = errorHandlerID;
+ this.evaluatorConfigurationPath = evaluatorConfigurationPath;
+ this.isProfilingEnabled = enableProfiling;
+ this.errorHandler = errorHandler;
+ this.configurationSerializer = configurationSerializer;
+
+
+ // Registering a default exception handler. It sends every exception to the upstream RemoteManager
+ Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
+
+
+ if (isProfilingEnabled) {
+ this.profiler = new WakeProfiler();
+ ProfilingStopHandler.setProfiler(profiler); // TODO: This probably should be bound via Tang.
+ }
+ }
+
+ /**
+ * Loads the client and resource manager configuration files from disk.
+ */
+ private Configuration getClockConfiguration() {
+ return Configurations.merge(readConfigurationFromDisk(), getStaticClockConfiguration());
+ }
+
+
+ private Configuration readConfigurationFromDisk() {
+ LOG.log(Level.FINEST, "Loading configuration file: {0}", this.evaluatorConfigurationPath);
+
+ final File evaluatorConfigFile = new File(this.evaluatorConfigurationPath);
+
+ if (!evaluatorConfigFile.exists()) {
+ final String message = "The configuration file " + this.evaluatorConfigurationPath +
+ "doesn't exist. This points to an issue in the job submission.";
+ fail(message, new FileNotFoundException());
+ throw new RuntimeException(message);
+ } else if (!evaluatorConfigFile.canRead()) {
+ final String message = "The configuration file " + this.evaluatorConfigurationPath + " exists, but can't be read";
+ fail(message, new IOException());
+ throw new RuntimeException(message);
+ } else {
+ try {
+ return this.configurationSerializer.fromFile(evaluatorConfigFile);
+ } catch (final IOException e) {
+ final String message = "Unable to parse the configuration file " + this.evaluatorConfigurationPath;
+ fail(message, e);
+ throw new RuntimeException(message, e);
+ }
+ }
+ }
+
+ /**
+ * @return the part of the clock configuration *not* read from disk.
+ */
+ private Configuration getStaticClockConfiguration() {
+ final JavaConfigurationBuilder builder = Tang.Factory.getTang().newConfigurationBuilder()
+ .bindNamedParameter(LaunchID.class, this.launchID)
+ .bindNamedParameter(ErrorHandlerRID.class, this.errorHandlerID)
+ .bindSetEntry(Clock.StartHandler.class, PIDStoreStartHandler.class)
+ .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
+ .bindNamedParameter(RemoteConfiguration.ManagerName.class, "REEF_LAUNCHER")
+ .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class);
+ if (this.isProfilingEnabled) {
+ builder.bindSetEntry(Clock.StopHandler.class, ProfilingStopHandler.class);
+ }
+ return builder.build();
+ }
+
+ /**
+ * Instantiates the clock.
+ *
+ * @return a clock object.
+ */
+ private Clock getClock() {
+ try {
+ final Injector clockInjector = Tang.Factory.getTang().newInjector(this.getClockConfiguration());
+ if (isProfilingEnabled) {
+ clockInjector.bindAspect(profiler);
+ }
+ clockInjector.bindVolatileInstance(RemoteManager.class, this.remoteManager);
+ return clockInjector.getInstance(Clock.class);
+ } catch (final Throwable ex) {
+ fail("Unable to instantiate the clock", ex);
+ throw new RuntimeException("Unable to instantiate the clock", ex);
+ }
+ }
+
+ /**
+ * Starts the Clock.
+ * This blocks until the clock returns.
+ */
+ @Override
+ public void run() {
+ LOG.entering(this.getClass().getName(), "run", "Starting the clock");
+ try {
+ this.getClock().run();
+ } catch (final Throwable t) {
+ fail("Fatal exception while executing the clock", t);
+ }
+ LOG.exiting(this.getClass().getName(), "run", "Clock terminated");
+ }
+
+ /**
+ * Closes the remote manager managed by this class.
+ *
+ * @throws Exception
+ */
+ @Override
+ public void close() throws Exception {
+ LOG.entering(this.getClass().getName(), "close");
+ this.errorHandler.close(); // Also closes the remoteManager
+ LOG.exiting(this.getClass().getName(), "close");
+ }
+
+ private void fail(final String message, final Throwable throwable) {
+ this.errorHandler.onNext(new Exception(message, throwable));
+ }
+
+ @NamedParameter(doc = "If true, profiling will be enabled", short_name = "profiling", default_value = "false")
+ public final static class ProfilingEnabled implements Name<Boolean> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java
new file mode 100644
index 0000000..6874b83
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LaunchCommandBuilder.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+import java.util.List;
+
+/**
+ * Used to build the launch command for REEF processes.
+ */
+public interface LaunchCommandBuilder {
+
+
+ /**
+ * @return the launch command line
+ */
+ public List<String> build();
+
+ public LaunchCommandBuilder setErrorHandlerRID(final String errorHandlerRID);
+
+ public LaunchCommandBuilder setLaunchID(final String launchID);
+
+ public LaunchCommandBuilder setMemory(final int megaBytes);
+
+ /**
+ * Set the name of the configuration file for the Launcher. This file is assumed to exist in the working directory of
+ * the process launched with this command line.
+ *
+ * @param configurationFileName
+ * @return this
+ */
+ public LaunchCommandBuilder setConfigurationFileName(final String configurationFileName);
+
+ /**
+ * Names a file to which stdout will be redirected.
+ *
+ * @param standardOut
+ * @return this
+ */
+ public LaunchCommandBuilder setStandardOut(final String standardOut);
+
+ /**
+ * Names a file to which stderr will be redirected.
+ *
+ * @param standardErr
+ * @return this
+ */
+ public LaunchCommandBuilder setStandardErr(final String standardErr);
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LauncherSingletons.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LauncherSingletons.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LauncherSingletons.java
new file mode 100644
index 0000000..51da6c0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/LauncherSingletons.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProfilingStopHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProfilingStopHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProfilingStopHandler.java
new file mode 100644
index 0000000..3ddabb0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/ProfilingStopHandler.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+import org.apache.reef.runtime.common.launch.parameters.LaunchID;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.profiler.WakeProfiler;
+import org.apache.reef.wake.time.event.StopTime;
+
+import javax.inject.Inject;
+import java.io.FileNotFoundException;
+import java.io.PrintWriter;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * An EventHandler that writes out the profiler results.
+ */
+final class ProfilingStopHandler implements EventHandler<StopTime> {
+ private static final Logger LOG = Logger.getLogger(ProfilingStopHandler.class.getName());
+ private static WakeProfiler profiler;
+ private final String launchID;
+
+ @Inject
+ public ProfilingStopHandler(final @Parameter(LaunchID.class) String launchID) {
+ this.launchID = launchID;
+ }
+
+ static void setProfiler(final WakeProfiler profiler) {
+ ProfilingStopHandler.profiler = profiler;
+ }
+
+ @Override
+ public void onNext(final StopTime stopTime) {
+ try (final PrintWriter out = new PrintWriter("profile-" + launchID + ".json")) {
+ out.print(profiler.objectGraphToString());
+ } catch (final FileNotFoundException e) {
+ LOG.log(Level.WARNING, "Unable to write the profile", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java
new file mode 100644
index 0000000..544fcc6
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFErrorHandler.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.launch.parameters.ErrorHandlerRID;
+import org.apache.reef.runtime.common.launch.parameters.LaunchID;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+/**
+ * The error handler REEF registers with Wake.
+ */
+public final class REEFErrorHandler implements EventHandler<Throwable> {
+
+ private static final Logger LOG = Logger.getLogger(REEFErrorHandler.class.getName());
+
+ // This class is used as the ErrorHandler in the RemoteManager. Hence, we need an InjectionFuture here.
+ private final InjectionFuture<RemoteManager> remoteManager;
+ private final String launchID;
+ private final String errorHandlerRID;
+ private final ExceptionCodec exceptionCodec;
+
+ @Inject
+ REEFErrorHandler(final InjectionFuture<RemoteManager> remoteManager,
+ final @Parameter(ErrorHandlerRID.class) String errorHandlerRID,
+ final @Parameter(LaunchID.class) String launchID,
+ final ExceptionCodec exceptionCodec) {
+ this.errorHandlerRID = errorHandlerRID;
+ this.remoteManager = remoteManager;
+ this.launchID = launchID;
+ this.exceptionCodec = exceptionCodec;
+ }
+
+ @Override
+ public void onNext(final Throwable e) {
+ LOG.log(Level.SEVERE, "Uncaught exception.", e);
+ // TODO: This gets a new EventHandler each time an exception is caught. It would be better to cache the handler. But
+ // that introduces threading issues and isn't really worth it, as the JVM typically will be killed once we catch an
+ // Exception in here.
+ if (!this.errorHandlerRID.equals(ErrorHandlerRID.NONE)) {
+ final EventHandler<ReefServiceProtos.RuntimeErrorProto> runtimeErrorHandler = this.remoteManager.get()
+ .getHandler(errorHandlerRID, ReefServiceProtos.RuntimeErrorProto.class);
+ final ReefServiceProtos.RuntimeErrorProto message = ReefServiceProtos.RuntimeErrorProto.newBuilder()
+ .setName("reef")
+ .setIdentifier(launchID)
+ .setMessage(e.getMessage())
+ .setException(ByteString.copyFrom(this.exceptionCodec.toBytes(e)))
+ .build();
+ try {
+ runtimeErrorHandler.onNext(message);
+ } catch (final Throwable t) {
+ LOG.log(Level.SEVERE, "Unable to send the error upstream", t);
+ }
+ } else {
+ LOG.log(Level.SEVERE, "Caught an exception from Wake we cannot send upstream because there is no upstream");
+ }
+ }
+
+ public void close() {
+ try {
+ this.remoteManager.get().close();
+ } catch (final Throwable ex) {
+ LOG.log(Level.SEVERE, "Unable to close the remote manager", ex);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "REEFErrorHandler{" +
+ "remoteManager=" + remoteManager +
+ ", launchID='" + launchID + '\'' +
+ ", errorHandlerRID='" + errorHandlerRID + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
new file mode 100644
index 0000000..8a81184
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFMessageCodec.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+import com.google.protobuf.GeneratedMessage;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.proto.REEFProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.wake.remote.Codec;
+
+import javax.inject.Inject;
+
+/**
+ * Codec for REEF's control flow messages.
+ */
+@Private
+@Provided
+@ClientSide
+@DriverSide
+@EvaluatorSide
+public final class REEFMessageCodec implements Codec<GeneratedMessage> {
+
+
+ @Inject
+ private REEFMessageCodec() {
+ }
+
+ @Override
+ public GeneratedMessage decode(final byte[] bytes) {
+ try {
+ final REEFProtocol.REEFMessage message = REEFProtocol.REEFMessage.parseFrom(bytes);
+ if (message.hasJobSubmission()) {
+ return message.getJobSubmission();
+ } else if (message.hasJobControl()) {
+ return message.getJobControl();
+ } else if (message.hasRuntimeError()) {
+ return message.getRuntimeError();
+ } else if (message.hasJobStatus()) {
+ return message.getJobStatus();
+ } else if (message.hasEvaluatorControl()) {
+ return message.getEvaluatorControl();
+ } else if (message.hasEvaluatorHeartBeat()) {
+ return message.getEvaluatorHeartBeat();
+ }
+ throw new RuntimeException("Unable to decode a message: " + message.toString());
+ } catch (final InvalidProtocolBufferException e) {
+ throw new RuntimeException("Unable to decode a message", e);
+ }
+ }
+
+ @Override
+ public byte[] encode(final GeneratedMessage msg) {
+ final REEFProtocol.REEFMessage.Builder message = REEFProtocol.REEFMessage.newBuilder();
+
+ if (msg instanceof ClientRuntimeProtocol.JobSubmissionProto) {
+ message.setJobSubmission((ClientRuntimeProtocol.JobSubmissionProto) msg);
+ } else if (msg instanceof ClientRuntimeProtocol.JobControlProto) {
+ message.setJobControl((ClientRuntimeProtocol.JobControlProto) msg);
+ } else if (msg instanceof ReefServiceProtos.RuntimeErrorProto) {
+ message.setRuntimeError((ReefServiceProtos.RuntimeErrorProto) msg);
+ } else if (msg instanceof ReefServiceProtos.JobStatusProto) {
+ message.setJobStatus((ReefServiceProtos.JobStatusProto) msg);
+ } else if (msg instanceof EvaluatorRuntimeProtocol.EvaluatorControlProto) {
+ message.setEvaluatorControl((EvaluatorRuntimeProtocol.EvaluatorControlProto) msg);
+ } else if (msg instanceof EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto) {
+ message.setEvaluatorHeartBeat((EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto) msg);
+ } else {
+ throw new RuntimeException("Unable to serialize: " + msg);
+ }
+
+ return message.build().toByteArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java
new file mode 100644
index 0000000..5190f86
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/REEFUncaughtExceptionHandler.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This is used as the Exception handler for REEF client processes (Driver, Evaluator).
+ * <p/>
+ * It catches all exceptions and sends them to the controlling process.
+ * For Evaluators, that is the Driver. For the Driver, that is the Client.
+ * <p/>
+ * After sending the exception, this shuts down the JVM, as this JVM is then officially dead.
+ */
+final class REEFUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+ private static final Logger LOG = Logger.getLogger(REEFUncaughtExceptionHandler.class.getName());
+ private final REEFErrorHandler errorHandler;
+
+
+ /**
+ * @param errorHandler
+ */
+ @Inject
+ REEFUncaughtExceptionHandler(final REEFErrorHandler errorHandler) {
+ this.errorHandler = errorHandler;
+ }
+
+ @Override
+ public final synchronized void uncaughtException(final Thread thread, final Throwable throwable) {
+ final String msg = "Thread " + thread.getName() + " threw an uncaught exception.";
+ LOG.log(Level.SEVERE, msg, throwable);
+ this.errorHandler.onNext(new Exception(msg, throwable));
+ try {
+ this.wait(100); // TODO: Remove
+ } catch (final InterruptedException e) {
+
+ }
+ this.errorHandler.close();
+ LOG.log(Level.SEVERE, "System.exit(1)");
+ System.exit(1);
+ }
+
+ @Override
+ public String toString() {
+ return "REEFUncaughtExceptionHandler{" +
+ "errorHandler=" + errorHandler +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/package-info.java
new file mode 100644
index 0000000..fef18f4
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Common launch code between Driver and Evaluator.
+ */
+package org.apache.reef.runtime.common.launch;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/ClockConfigurationPath.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/ClockConfigurationPath.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/ClockConfigurationPath.java
new file mode 100644
index 0000000..3715774
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/ClockConfigurationPath.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The path to the clock configuration
+ */
+@NamedParameter(doc = "The path to process configuration.", short_name = ClockConfigurationPath.SHORT_NAME)
+public final class ClockConfigurationPath implements Name<String> {
+ public static final String SHORT_NAME = "runtime_configuration";
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/ErrorHandlerRID.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/ErrorHandlerRID.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/ErrorHandlerRID.java
new file mode 100644
index 0000000..0285ad0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/ErrorHandlerRID.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The error handler remote identifier.", short_name = ErrorHandlerRID.SHORT_NAME, default_value = ErrorHandlerRID.NONE)
+public final class ErrorHandlerRID implements Name<String> {
+ /**
+ * Indicates that no ErrorHandler is bound.
+ */
+ // TODO: Find all comparisons with this and see whether we can do something about them
+ public static final String NONE = "NO_ERROR_HANDLER_REMOTE_ID";
+
+ /**
+ * Short name for the command line and such.
+ */
+ public static final String SHORT_NAME = "error_handler_rid";
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/LaunchID.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/LaunchID.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/LaunchID.java
new file mode 100644
index 0000000..7dc8546
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/LaunchID.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+@NamedParameter(doc = "The launch identifier.", short_name = LaunchID.SHORT_NAME)
+public final class LaunchID implements Name<String> {
+ public static final String SHORT_NAME = "launch_id";
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/package-info.java
new file mode 100644
index 0000000..155c399
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/parameters/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Parameters of the Launcher.
+ */
+package org.apache.reef.runtime.common.launch.parameters;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/package-info.java
new file mode 100644
index 0000000..57a7e54
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/package-info.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * REEF implementation.
+ * This is referred to as the `common resourcemanager` that contains code
+ * common to implementations of REEF on all resource managers.
+ * The individual resource managers are implemented by providing implementations
+ * of the various interfaces presribed in sub-packages called `API`.
+ */
+package org.apache.reef.runtime.common;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/parameters/DeleteTempFiles.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/parameters/DeleteTempFiles.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/parameters/DeleteTempFiles.java
new file mode 100644
index 0000000..891d890
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/parameters/DeleteTempFiles.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * Whether or not to delete the temporary files created during Driver and Evaluator submission.
+ */
+@NamedParameter(doc = "Whether or not to delete the temporary files created during Driver and Evaluator submission.", default_value = "true")
+public final class DeleteTempFiles implements Name<Boolean> {
+ private DeleteTempFiles() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/parameters/JVMHeapSlack.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/parameters/JVMHeapSlack.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/parameters/JVMHeapSlack.java
new file mode 100644
index 0000000..3caa66b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/parameters/JVMHeapSlack.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The fraction of the container memory NOT to use for the Java Heap.
+ */
+@NamedParameter(doc = "The fraction of the container memory NOT to use for the Java Heap.", default_value = "0.0")
+public final class JVMHeapSlack implements Name<Double> {
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/BroadCastEventHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/BroadCastEventHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/BroadCastEventHandler.java
new file mode 100644
index 0000000..49fe345
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/BroadCastEventHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.utils;
+
+import org.apache.reef.wake.EventHandler;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class BroadCastEventHandler<E> implements EventHandler<E> {
+ private final List<EventHandler<E>> handlers;
+
+ public BroadCastEventHandler(final Collection<EventHandler<E>> handlers) {
+ this.handlers = new ArrayList<>(handlers);
+ }
+
+ @Override
+ public void onNext(final E event) {
+ for (final EventHandler<E> handler : handlers)
+ handler.onNext(event);
+ }
+
+ public void addEventHandler(final EventHandler<E> eventHandler) {
+ this.handlers.add(eventHandler);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DefaultExceptionCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DefaultExceptionCodec.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DefaultExceptionCodec.java
new file mode 100644
index 0000000..1a80f96
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DefaultExceptionCodec.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.utils;
+
+import org.apache.commons.lang.SerializationException;
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default implementation of ExceptionCodec that uses Java serialization as its implementation.
+ */
+@Private
+final class DefaultExceptionCodec implements ExceptionCodec {
+ private static final Logger LOG = Logger.getLogger(DefaultExceptionCodec.class.getName());
+
+ @Inject
+ DefaultExceptionCodec() {
+ }
+
+ @Override
+ public Optional<Throwable> fromBytes(final byte[] bytes) {
+ try {
+ return Optional.<Throwable>of((Throwable) SerializationUtils.deserialize(bytes));
+ } catch (SerializationException | IllegalArgumentException e) {
+ LOG.log(Level.FINE, "Unable to deserialize a Throwable.", e);
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public Optional<Throwable> fromBytes(final Optional<byte[]> bytes) {
+ if (bytes.isPresent()) {
+ return this.fromBytes(bytes.get());
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public byte[] toBytes(final Throwable throwable) {
+ return SerializationUtils.serialize(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
new file mode 100644
index 0000000..64c32e8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.utils;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.tang.util.MonotonicHashMap;
+import org.apache.reef.util.ExceptionHandlingEventHandler;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.ThreadPoolStage;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Delayed event router that dispatches messages to the proper event handler by type.
+ * This class is used in EvaluatorManager to isolate user threads from REEF.
+ */
+@Private
+@DriverSide
+public final class DispatchingEStage implements AutoCloseable {
+
+ /**
+ * A map of event handlers, populated in the register() method.
+ */
+ private final Map<Class<?>, EventHandler<?>> handlers =
+ Collections.synchronizedMap(new MonotonicHashMap<Class<?>, EventHandler<?>>());
+ /**
+ * Exception handler, one for all event handlers.
+ */
+ private final EventHandler<Throwable> errorHandler;
+ /**
+ * Thread pool to process delayed event handler invocations.
+ */
+ private final ThreadPoolStage<DelayedOnNext> stage;
+
+ /**
+ * @param errorHandler used for exceptions thrown from the event handlers registered.
+ * @param numThreads number of threads to allocate to dispatch events.
+ * @param stageName the name to use for the underlying stage. It will be carried over to name the Thread(s) spawned.
+ */
+ public DispatchingEStage(final EventHandler<Throwable> errorHandler,
+ final int numThreads,
+ final String stageName) {
+ this.errorHandler = errorHandler;
+ this.stage = new ThreadPoolStage<>(stageName,
+ new EventHandler<DelayedOnNext>() {
+ @Override
+ public void onNext(final DelayedOnNext promise) {
+ promise.handler.onNext(promise.message);
+ }
+ }, numThreads
+ );
+
+ }
+
+ /**
+ * Constructs a DispatchingEStage that uses the Thread pool and ErrorHandler of another one.
+ *
+ * @param other
+ */
+ public DispatchingEStage(final DispatchingEStage other) {
+ this.errorHandler = other.errorHandler;
+ this.stage = other.stage;
+ }
+
+ /**
+ * Register a new event handler.
+ *
+ * @param type Message type to process with this handler.
+ * @param handlers A set of handlers that process that type of message.
+ * @param <T> Message type.
+ * @param <U> Type of message that event handler supports. Must be a subclass of T.
+ */
+ public <T, U extends T> void register(final Class<T> type, final Set<EventHandler<U>> handlers) {
+ this.handlers.put(type, new ExceptionHandlingEventHandler<>(
+ new BroadCastEventHandler<>(handlers), this.errorHandler));
+ }
+
+ /**
+ * Dispatch a new message by type.
+ *
+ * @param type Type of event handler - must match the register() call.
+ * @param message A message to process. Must be a subclass of T.
+ * @param <T> Message type that event handler supports.
+ * @param <U> input message type. Must be a subclass of T.
+ */
+ @SuppressWarnings("unchecked")
+ public <T, U extends T> void onNext(final Class<T> type, final U message) {
+ final EventHandler<T> handler = (EventHandler<T>) this.handlers.get(type);
+ this.stage.onNext(new DelayedOnNext(handler, message));
+ }
+
+ /**
+ * Return true if there are no messages queued or in processing, false otherwise.
+ */
+ public boolean isEmpty() {
+ return this.stage.getQueueLength() == 0;
+ }
+
+ /**
+ * Close the internal thread pool.
+ *
+ * @throws Exception forwarded from EStage.close() call.
+ */
+ @Override
+ public void close() throws Exception {
+ this.stage.close();
+ }
+
+ /**
+ * Delayed EventHandler.onNext() call.
+ * Contains a message object and EventHandler to process it.
+ */
+ private static final class DelayedOnNext {
+
+ public final EventHandler<Object> handler;
+ public final Object message;
+
+ @SuppressWarnings("unchecked")
+ public <T, U extends T> DelayedOnNext(final EventHandler<T> handler, final U message) {
+ this.handler = (EventHandler<Object>) handler;
+ this.message = message;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/ExceptionCodec.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/ExceptionCodec.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/ExceptionCodec.java
new file mode 100644
index 0000000..37fa264
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/ExceptionCodec.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.utils;
+
+import org.apache.reef.tang.annotations.DefaultImplementation;
+import org.apache.reef.util.Optional;
+
+/**
+ * (De-)serializes exceptions.
+ */
+@DefaultImplementation(DefaultExceptionCodec.class)
+public interface ExceptionCodec {
+
+ /**
+ * Deserializes a Throwable that has been serialized using toBytes().
+ *
+ * @param bytes
+ * @return the Throable or Optional.empty() if the deserialization fails.
+ */
+ public Optional<Throwable> fromBytes(final byte[] bytes);
+
+ /**
+ * @param bytes
+ * @return fromBytes(bytes.get()) or Optional.empty()
+ */
+ public Optional<Throwable> fromBytes(final Optional<byte[]> bytes);
+
+ /**
+ * @param throwable
+ * @return the serialized form of the given Throwable.
+ */
+ public byte[] toBytes(final Throwable throwable);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java
new file mode 100644
index 0000000..e17108e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/RemoteManager.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.utils;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteIdentifierFactory;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+public class RemoteManager {
+
+ private static final Logger LOG = Logger.getLogger(RemoteManager.class.getName());
+
+ private final org.apache.reef.wake.remote.RemoteManager raw;
+ private final RemoteIdentifierFactory factory;
+
+ @Inject
+ public RemoteManager(final org.apache.reef.wake.remote.RemoteManager raw,
+ final RemoteIdentifierFactory factory) {
+ this.raw = raw;
+ this.factory = factory;
+ LOG.log(Level.FINE, "Instantiated 'RemoteManager' with remoteId: {0}", this.getMyIdentifier());
+ }
+
+ public final org.apache.reef.wake.remote.RemoteManager raw() {
+ return this.raw;
+ }
+
+ public void close() throws Exception {
+ this.raw.close();
+ }
+
+ public <T> EventHandler<T> getHandler(
+ final String destinationIdentifier, final Class<? extends T> messageType) {
+ return this.raw.getHandler(factory.getNewInstance(destinationIdentifier), messageType);
+ }
+
+ public <T, U extends T> AutoCloseable registerHandler(
+ final String sourceIdentifier, final Class<U> messageType,
+ final EventHandler<T> theHandler) {
+ return this.raw.registerHandler(factory.getNewInstance(sourceIdentifier), messageType, theHandler);
+ }
+
+ public <T, U extends T> AutoCloseable registerHandler(
+ final Class<U> messageType, final EventHandler<RemoteMessage<T>> theHandler) {
+ return this.raw.registerHandler(messageType, theHandler);
+ }
+
+ public AutoCloseable registerErrorHandler(final EventHandler<Exception> theHandler) {
+ return this.raw.registerErrorHandler(theHandler);
+ }
+
+ public String getMyIdentifier() {
+ return this.raw.getMyIdentifier().toString();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/HeartBeatTriggerManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/HeartBeatTriggerManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/HeartBeatTriggerManager.java
new file mode 100644
index 0000000..7137420
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/HeartBeatTriggerManager.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.runtime.common.evaluator.HeartBeatManager;
+
+import javax.inject.Inject;
+
+/**
+ * Helper class for immediately sending heartbeat messages.
+ * This can be used together with TaskMessageSource to push urgent messages to the Driver.
+ * <p/>
+ * CAUTION: Do not overuse as the Driver can be saturated with heartbeats.
+ *
+ * @see https://issues.apache.org/jira/browse/REEF-33 for the ongoing discussion of alternatives to this design.
+ */
+@TaskSide
+@Public
+@Unstable
+public class HeartBeatTriggerManager {
+ private final HeartBeatManager heartBeatManager;
+
+ @Inject
+ HeartBeatTriggerManager(final HeartBeatManager heartBeatManager) {
+ this.heartBeatManager = heartBeatManager;
+ }
+
+ /**
+ * Immediately send a heartbeat message to the Driver.
+ */
+ public void triggerHeartBeat() {
+ this.heartBeatManager.sendHeartbeat();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/Task.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/Task.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/Task.java
new file mode 100644
index 0000000..b042558
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/Task.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task;
+
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.annotations.audience.TaskSide;
+
+import java.util.concurrent.Callable;
+
+/**
+ * The interface for Tasks.
+ * <p/>
+ * This interface is to be implemented for Tasks.
+ * <p/>
+ * The main entry point for a Task is the call() method inherited from
+ * {@link Callable}. The REEF Evaluator will call this method in order to run
+ * the Task. The byte[] returned by it will be pushed to the Job Driver.
+ */
+@TaskSide
+@Public
+public interface Task {
+
+ /**
+ * Called by the resourcemanager harness to execute the task.
+ *
+ * @param memento the memento objected passed down by the driver.
+ * @return the user defined return value
+ * @throws Exception whenever the Task encounters an unsolved issue.
+ * This Exception will be thrown at the Driver's event handler.
+ */
+ public byte[] call(final byte[] memento) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/TaskMessage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/TaskMessage.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/TaskMessage.java
new file mode 100644
index 0000000..60c8c33
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/TaskMessage.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task;
+
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.io.Message;
+
+/**
+ * A message sent from a Task to a Driver.
+ */
+@EvaluatorSide
+@Public
+@Provided
+public final class TaskMessage implements Message {
+
+ private final String messageSourceID;
+ private final byte[] theBytes;
+
+ private TaskMessage(final String messageSourceID, final byte[] theBytes) {
+ this.messageSourceID = messageSourceID;
+ this.theBytes = theBytes;
+ }
+
+ /**
+ * @param messageSourceID The message's sourceID. This will be accessible in the Driver for routing.
+ * @param theBytes The actual content of the message, serialized into a byte[]
+ * @return a new TaskMessage with the given content.
+ */
+ public static TaskMessage from(final String messageSourceID, final byte[] theBytes) {
+ assert (theBytes != null && messageSourceID != null);
+ return new TaskMessage(messageSourceID, theBytes);
+ }
+
+ /**
+ * @return the message source identifier.
+ */
+ public String getMessageSourceID() {
+ return this.messageSourceID;
+ }
+
+ /**
+ * @return the message
+ */
+ @Override
+ public byte[] get() {
+ return this.theBytes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/TaskMessageSource.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/TaskMessageSource.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/TaskMessageSource.java
new file mode 100644
index 0000000..38077e0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/TaskMessageSource.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.util.Optional;
+
+/**
+ * Message source for control flow messages from a task to the Driver.
+ * <p/>
+ * The getMessage() method in an Implementation of this interface will be called by the Evaluator resourcemanager whenever it is
+ * about to communicate with the Driver anyway. Hence, this can be used for occasional status updates etc.
+ */
+@Public
+@EvaluatorSide
+public interface TaskMessageSource {
+
+ /**
+ * @return a message to be sent back to the Driver, or Optional.empty() if no message shall be sent at this time.
+ */
+ public Optional<TaskMessage> getMessage();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/events/CloseEvent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/events/CloseEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/CloseEvent.java
new file mode 100644
index 0000000..106bc5f
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/CloseEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task.events;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.util.Optional;
+
+/**
+ * Indicates that the driver called .close() on this Task.
+ */
+@TaskSide
+@Provided
+@Public
+public interface CloseEvent {
+
+ /**
+ * @return the message sent with the close call, if any.
+ */
+ public Optional<byte[]> get();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/events/DriverMessage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/events/DriverMessage.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/DriverMessage.java
new file mode 100644
index 0000000..dbfe025
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/DriverMessage.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task.events;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.util.Optional;
+
+/**
+ * Represents a message sent by the driver.
+ */
+@TaskSide
+@Public
+@Provided
+public interface DriverMessage {
+
+ /**
+ * @return the message sent by the driver.
+ */
+ public Optional<byte[]> get();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/events/SuspendEvent.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/events/SuspendEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/SuspendEvent.java
new file mode 100644
index 0000000..f23f3bf
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/SuspendEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task.events;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.annotations.audience.TaskSide;
+import org.apache.reef.util.Optional;
+
+/**
+ * Event fired when the driver called suspend() on this task.
+ */
+@TaskSide
+@Provided
+@Public
+public interface SuspendEvent {
+
+ /**
+ * @return the message sent with the suspend call, if any.
+ */
+ public Optional<byte[]> get();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/events/TaskStart.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/events/TaskStart.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/TaskStart.java
new file mode 100644
index 0000000..1a7a38b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/TaskStart.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task.events;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.io.naming.Identifiable;
+
+/**
+ * Represents a TaskStart. Fired right before Task.call() is invoked.
+ */
+@EvaluatorSide
+@Public
+public interface TaskStart extends Identifiable {
+
+ /**
+ * @return task identifier.
+ */
+ @Override
+ String getId();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/task/events/TaskStop.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/task/events/TaskStop.java b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/TaskStop.java
new file mode 100644
index 0000000..3a77dbb
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/task/events/TaskStop.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.task.events;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.io.naming.Identifiable;
+
+/**
+ * Represents a Task stop. Fired right after Task.call() returned.
+ */
+@EvaluatorSide
+@Public
+public interface TaskStop extends Identifiable {
+
+ /**
+ * @return the task identifier
+ */
+ public String getId();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/util/Builder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/util/Builder.java b/lang/java/reef-common/src/main/java/org/apache/reef/util/Builder.java
new file mode 100644
index 0000000..3cd0db1
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/util/Builder.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util;
+
+/**
+ * A basic Builder pattern interface.
+ *
+ * @param <T> The type of object to be built.
+ */
+public interface Builder<T> {
+
+ /**
+ * Builds a fresh instance of the object. This can be invoked several times,
+ * each of which return a new instance.
+ *
+ * @return a fresh instance of the object.
+ */
+ public T build();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/util/CommandUtils.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/util/CommandUtils.java b/lang/java/reef-common/src/main/java/org/apache/reef/util/CommandUtils.java
new file mode 100644
index 0000000..146a161
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/util/CommandUtils.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * run given command and return the result as string
+ */
+final public class CommandUtils {
+ /**
+ * Standard java logger.
+ */
+ private static final Logger LOG = Logger.getLogger(CommandUtils.class.getName());
+
+ public final static String runCommand(final String command) {
+ final StringBuilder sb = new StringBuilder();
+ try {
+ final String cmd = OSUtils.isWindows() ? "cmd.exe /c " + command : command;
+ final Process proc = Runtime.getRuntime().exec(cmd);
+
+ try (final BufferedReader input =
+ new BufferedReader(new InputStreamReader(proc.getInputStream()))) {
+ String line;
+ while ((line = input.readLine()) != null) {
+ sb.append(line).append('\n');
+ }
+ }
+ } catch (final IOException ex) {
+ LOG.log(Level.SEVERE, "Error in call: " + command, ex);
+ sb.append(ex);
+ }
+ return sb.toString();
+ }
+}