You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:08 UTC
[35/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/CloseEventImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/CloseEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/CloseEventImpl.java
new file mode 100644
index 0000000..5b502d1
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/CloseEventImpl.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import org.apache.reef.task.events.CloseEvent;
+import org.apache.reef.util.Optional;
+
+final class CloseEventImpl implements CloseEvent {
+ private final Optional<byte[]> value;
+
+ CloseEventImpl() {
+ this.value = Optional.empty();
+ }
+
+ CloseEventImpl(final byte[] theBytes) {
+ this.value = Optional.ofNullable(theBytes);
+ }
+
+ @Override
+ public Optional<byte[]> get() {
+ return this.value;
+ }
+
+ @Override
+ public String toString() {
+ return "CloseEvent{value=" + this.value + '}';
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/DriverMessageImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/DriverMessageImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/DriverMessageImpl.java
new file mode 100644
index 0000000..d1a492e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/DriverMessageImpl.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import org.apache.reef.task.events.DriverMessage;
+import org.apache.reef.util.Optional;
+
+final class DriverMessageImpl implements DriverMessage {
+ private final Optional<byte[]> value;
+
+ DriverMessageImpl() {
+ this.value = Optional.empty();
+ }
+
+ DriverMessageImpl(final byte[] theBytes) {
+ this.value = Optional.ofNullable(theBytes);
+ }
+
+ @Override
+ public Optional<byte[]> get() {
+ return this.value;
+ }
+
+ @Override
+ public String toString() {
+ return "DriverMessage{value=" + this.value + '}';
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/SuspendEventImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/SuspendEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/SuspendEventImpl.java
new file mode 100644
index 0000000..a1c0f0d
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/SuspendEventImpl.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import org.apache.reef.task.events.SuspendEvent;
+import org.apache.reef.util.Optional;
+
+final class SuspendEventImpl implements SuspendEvent {
+ private final Optional<byte[]> value;
+
+ SuspendEventImpl() {
+ this.value = Optional.empty();
+ }
+
+ SuspendEventImpl(final byte[] theBytes) {
+ this.value = Optional.ofNullable(theBytes);
+ }
+
+ @Override
+ public Optional<byte[]> get() {
+ return this.value;
+ }
+
+ @Override
+ public String toString() {
+ return "SuspendEvent{value=" + this.value + '}';
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskClientCodeException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskClientCodeException.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskClientCodeException.java
new file mode 100644
index 0000000..1e0cc5e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskClientCodeException.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+/**
+ * Thrown by REEF's resourcemanager code when it catches an exception thrown by user code.
+ */
+public final class TaskClientCodeException extends Exception {
+
+ private final String taskId;
+ private final String contextId;
+
+ /**
+ * @param taskId the id of the failed task.
+ * @param contextId the ID of the context the failed Task was executing in.
+ * @param message the error message.
+ * @param cause the exception that caused the Task to fail.
+ */
+ public TaskClientCodeException(final String taskId,
+ final String contextId,
+ final String message,
+ final Throwable cause) {
+ super("Failure in task '" + taskId + "' in context '" + contextId + "': " + message, cause);
+ this.taskId = taskId;
+ this.contextId = contextId;
+ }
+
+ /**
+ * Extracts a task id from the given configuration.
+ *
+ * @param config
+ * @return the task id in the given configuration.
+ * @throws RuntimeException if the configuration can't be parsed.
+ */
+ public static String getTaskId(final Configuration config) {
+ try {
+ return Tang.Factory.getTang().newInjector(config).getNamedInstance(TaskConfigurationOptions.Identifier.class);
+ } catch (final InjectionException ex) {
+ throw new RuntimeException("Unable to determine task identifier. Giving up.", ex);
+ }
+ }
+
+ /**
+ * @return the ID of the failed Task.
+ */
+ public String getTaskId() {
+ return this.taskId;
+ }
+
+ /**
+ * @return the ID of the context the failed Task was executing in.
+ */
+ public String getContextId() {
+ return this.contextId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskLifeCycleHandlers.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskLifeCycleHandlers.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskLifeCycleHandlers.java
new file mode 100644
index 0000000..f03ed13
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskLifeCycleHandlers.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.runtime.common.evaluator.task.exceptions.TaskStartHandlerFailure;
+import org.apache.reef.runtime.common.evaluator.task.exceptions.TaskStopHandlerFailure;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.events.TaskStart;
+import org.apache.reef.task.events.TaskStop;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Convenience class to send task start and stop events.
+ */
+@EvaluatorSide
+@Private
+final class TaskLifeCycleHandlers {
+ private static final Logger LOG = Logger.getLogger(TaskLifeCycleHandlers.class.getName());
+ private final Set<EventHandler<TaskStop>> taskStopHandlers;
+ private final Set<EventHandler<TaskStart>> taskStartHandlers;
+ private final TaskStart taskStart;
+ private final TaskStop taskStop;
+
+ @Inject
+ TaskLifeCycleHandlers(final @Parameter(TaskConfigurationOptions.StopHandlers.class) Set<EventHandler<TaskStop>> taskStopHandlers,
+ final @Parameter(TaskConfigurationOptions.StartHandlers.class) Set<EventHandler<TaskStart>> taskStartHandlers,
+ final TaskStartImpl taskStart,
+ final TaskStopImpl taskStop) {
+ this.taskStopHandlers = taskStopHandlers;
+ this.taskStartHandlers = taskStartHandlers;
+ this.taskStart = taskStart;
+ this.taskStop = taskStop;
+ }
+
+ /**
+ * Sends the TaskStart event to the handlers for it.
+ */
+ public void beforeTaskStart() throws TaskStartHandlerFailure {
+ LOG.log(Level.FINEST, "Sending TaskStart event to the registered event handlers.");
+ for (final EventHandler<TaskStart> startHandler : this.taskStartHandlers) {
+ try {
+ startHandler.onNext(this.taskStart);
+ } catch (final Throwable throwable) {
+ throw new TaskStartHandlerFailure(startHandler, throwable);
+ }
+ }
+ LOG.log(Level.FINEST, "Done sending TaskStart event to the registered event handlers.");
+ }
+
+ /**
+ * Sends the TaskStop event to the handlers for it.
+ */
+ public void afterTaskExit() throws TaskStopHandlerFailure {
+ LOG.log(Level.FINEST, "Sending TaskStop event to the registered event handlers.");
+ for (final EventHandler<TaskStop> stopHandler : this.taskStopHandlers) {
+ try {
+ stopHandler.onNext(this.taskStop);
+ } catch (final Throwable throwable) {
+ throw new TaskStopHandlerFailure(stopHandler, throwable);
+ }
+ }
+ LOG.log(Level.FINEST, "Done sending TaskStop event to the registered event handlers.");
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskRuntime.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskRuntime.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskRuntime.java
new file mode 100644
index 0000000..ea8dd27
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskRuntime.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.evaluator.HeartBeatManager;
+import org.apache.reef.runtime.common.evaluator.task.exceptions.*;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.Task;
+import org.apache.reef.task.events.CloseEvent;
+import org.apache.reef.task.events.DriverMessage;
+import org.apache.reef.task.events.SuspendEvent;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import javax.xml.bind.DatatypeConverter;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The execution environment for a Task.
+ */
+@Private
+@EvaluatorSide
+public final class TaskRuntime implements Runnable {
+
+ private final static Logger LOG = Logger.getLogger(TaskRuntime.class.getName());
+
+ /**
+ * User supplied Task code.
+ */
+ private final Task task;
+
+ private final InjectionFuture<EventHandler<CloseEvent>> f_closeHandler;
+ private final InjectionFuture<EventHandler<SuspendEvent>> f_suspendHandler;
+ private final InjectionFuture<EventHandler<DriverMessage>> f_messageHandler;
+ private final TaskLifeCycleHandlers taskLifeCycleHandlers;
+
+ /**
+ * The memento given by the task configuration.
+ */
+ private final Optional<byte[]> memento;
+
+ /**
+ * Heart beat manager to trigger on heartbeats.
+ */
+ private final HeartBeatManager heartBeatManager;
+
+ private final TaskStatus currentStatus;
+
+ // TODO: Document
+ @Inject
+ private TaskRuntime(
+ final HeartBeatManager heartBeatManager,
+ final Task task,
+ final TaskStatus currentStatus,
+ final @Parameter(TaskConfigurationOptions.CloseHandler.class) InjectionFuture<EventHandler<CloseEvent>> f_closeHandler,
+ final @Parameter(TaskConfigurationOptions.SuspendHandler.class) InjectionFuture<EventHandler<SuspendEvent>> f_suspendHandler,
+ final @Parameter(TaskConfigurationOptions.MessageHandler.class) InjectionFuture<EventHandler<DriverMessage>> f_messageHandler,
+ final TaskLifeCycleHandlers taskLifeCycleHandlers) {
+ this(heartBeatManager, task, currentStatus, f_closeHandler, f_suspendHandler, f_messageHandler, null, taskLifeCycleHandlers);
+ }
+
+ // TODO: Document
+ @Inject
+ private TaskRuntime(
+ final HeartBeatManager heartBeatManager,
+ final Task task,
+ final TaskStatus currentStatus,
+ final @Parameter(TaskConfigurationOptions.CloseHandler.class) InjectionFuture<EventHandler<CloseEvent>> f_closeHandler,
+ final @Parameter(TaskConfigurationOptions.SuspendHandler.class) InjectionFuture<EventHandler<SuspendEvent>> f_suspendHandler,
+ final @Parameter(TaskConfigurationOptions.MessageHandler.class) InjectionFuture<EventHandler<DriverMessage>> f_messageHandler,
+ final @Parameter(TaskConfigurationOptions.Memento.class) String memento,
+ final TaskLifeCycleHandlers taskLifeCycleHandlers) {
+
+ this.heartBeatManager = heartBeatManager;
+ this.task = task;
+ this.taskLifeCycleHandlers = taskLifeCycleHandlers;
+
+ this.memento = null == memento ? Optional.<byte[]>empty() :
+ Optional.of(DatatypeConverter.parseBase64Binary(memento));
+
+ this.f_closeHandler = f_closeHandler;
+ this.f_suspendHandler = f_suspendHandler;
+ this.f_messageHandler = f_messageHandler;
+
+ this.currentStatus = currentStatus;
+ }
+
+ /**
+ * This method needs to be called before a Task can be run().
+ * It informs the Driver that the Task is initializing.
+ */
+ public void initialize() {
+ this.currentStatus.setInit();
+ }
+
+ /**
+ * Run the task: Fire TaskStart, call Task.call(), fire TaskStop.
+ */
+ @Override
+ public void run() {
+ try {
+ // Change state and inform the Driver
+ this.taskLifeCycleHandlers.beforeTaskStart();
+
+ LOG.log(Level.FINEST, "Informing registered EventHandler<TaskStart>.");
+ this.currentStatus.setRunning();
+
+ // Call Task.call()
+ final byte[] result = this.runTask();
+
+ // Inform the Driver about it
+ this.currentStatus.setResult(result);
+
+ LOG.log(Level.FINEST, "Informing registered EventHandler<TaskStop>.");
+ this.taskLifeCycleHandlers.afterTaskExit();
+
+ } catch (final TaskStartHandlerFailure taskStartHandlerFailure) {
+ LOG.log(Level.WARNING, "Caught an exception during TaskStart handler execution.", taskStartHandlerFailure);
+ this.currentStatus.setException(taskStartHandlerFailure.getCause());
+ } catch (final TaskStopHandlerFailure taskStopHandlerFailure) {
+ LOG.log(Level.WARNING, "Caught an exception during TaskStop handler execution.", taskStopHandlerFailure);
+ this.currentStatus.setException(taskStopHandlerFailure.getCause());
+ } catch (final TaskCallFailure e) {
+ LOG.log(Level.WARNING, "Caught an exception during Task.call().", e.getCause());
+ this.currentStatus.setException(e);
+ }
+ }
+
+ /**
+ * Called by heartbeat manager
+ *
+ * @return current TaskStatusProto
+ */
+ public ReefServiceProtos.TaskStatusProto getStatusProto() {
+ return this.currentStatus.toProto();
+ }
+
+ /**
+ * @return true, if the Task is no longer running, either because it is crashed or exited cleanly
+ */
+ public boolean hasEnded() {
+ return this.currentStatus.hasEnded();
+ }
+
+ /**
+ * @return the ID of the task.
+ */
+ public String getTaskId() {
+ return this.currentStatus.getTaskId();
+ }
+
+ public String getId() {
+ return "TASK:" + this.task.getClass().getSimpleName() + ':' + this.currentStatus.getTaskId();
+ }
+
+ /**
+ * Close the Task. This calls the configured close handler.
+ *
+ * @param message the optional message for the close handler or null if there none.
+ */
+ public final void close(final byte[] message) {
+ LOG.log(Level.FINEST, "Triggering Task close.");
+ synchronized (this.heartBeatManager) {
+ if (this.currentStatus.isNotRunning()) {
+ LOG.log(Level.WARNING, "Trying to close a task that is in state: {0}. Ignoring.",
+ this.currentStatus.getState());
+ } else {
+ try {
+ this.closeTask(message);
+ this.currentStatus.setCloseRequested();
+ } catch (final TaskCloseHandlerFailure taskCloseHandlerFailure) {
+ LOG.log(Level.WARNING, "Exception while executing task close handler.",
+ taskCloseHandlerFailure.getCause());
+ this.currentStatus.setException(taskCloseHandlerFailure.getCause());
+ }
+ }
+ }
+ }
+
+ /**
+ * Suspend the Task. This calls the configured suspend handler.
+ *
+ * @param message the optional message for the suspend handler or null if there none.
+ */
+ public void suspend(final byte[] message) {
+ synchronized (this.heartBeatManager) {
+ if (this.currentStatus.isNotRunning()) {
+ LOG.log(Level.WARNING, "Trying to suspend a task that is in state: {0}. Ignoring.",
+ this.currentStatus.getState());
+ } else {
+ try {
+ this.suspendTask(message);
+ this.currentStatus.setSuspendRequested();
+ } catch (final TaskSuspendHandlerFailure taskSuspendHandlerFailure) {
+ LOG.log(Level.WARNING, "Exception while executing task suspend handler.",
+ taskSuspendHandlerFailure.getCause());
+ this.currentStatus.setException(taskSuspendHandlerFailure.getCause());
+ }
+ }
+ }
+ }
+
+ /**
+ * Deliver a message to the Task. This calls into the user supplied message handler.
+ *
+ * @param message the message to be delivered.
+ */
+ public void deliver(final byte[] message) {
+ synchronized (this.heartBeatManager) {
+ if (this.currentStatus.isNotRunning()) {
+ LOG.log(Level.WARNING,
+ "Trying to send a message to a task that is in state: {0}. Ignoring.",
+ this.currentStatus.getState());
+ } else {
+ try {
+ this.deliverMessageToTask(message);
+ } catch (final TaskMessageHandlerFailure taskMessageHandlerFailure) {
+ LOG.log(Level.WARNING, "Exception while executing task close handler.",
+ taskMessageHandlerFailure.getCause());
+ this.currentStatus.setException(taskMessageHandlerFailure.getCause());
+ }
+ }
+ }
+ }
+
+ /**
+ * @return the ID of the Context this task is executing in.
+ */
+ private String getContextID() {
+ return this.currentStatus.getContextId();
+ }
+
+ /**
+ * Calls the Task.call() method and catches exceptions it may throw.
+ *
+ * @return the return value of Task.call()
+ * @throws TaskCallFailure if any Throwable was caught from the Task.call() method.
+ * That throwable would be the cause of the TaskCallFailure.
+ */
+ private byte[] runTask() throws TaskCallFailure {
+ try {
+ final byte[] result;
+ if (this.memento.isPresent()) {
+ LOG.log(Level.FINEST, "Calling Task.call() with a memento");
+ result = this.task.call(this.memento.get());
+ } else {
+ LOG.log(Level.FINEST, "Calling Task.call() without a memento");
+ result = this.task.call(null);
+ }
+ LOG.log(Level.FINEST, "Task.call() exited cleanly.");
+ return result;
+ } catch (final Throwable throwable) {
+ throw new TaskCallFailure(throwable);
+ }
+ }
+
+ /**
+ * Calls the configured Task close handler and catches exceptions it may throw.
+ */
+ private void closeTask(final byte[] message) throws TaskCloseHandlerFailure {
+ LOG.log(Level.FINEST, "Invoking close handler.");
+ try {
+ this.f_closeHandler.get().onNext(new CloseEventImpl(message));
+ } catch (final Throwable throwable) {
+ throw new TaskCloseHandlerFailure(throwable);
+ }
+ }
+
+ /**
+ * Calls the configured Task message handler and catches exceptions it may throw.
+ */
+ private void deliverMessageToTask(final byte[] message) throws TaskMessageHandlerFailure {
+ try {
+ this.f_messageHandler.get().onNext(new DriverMessageImpl(message));
+ } catch (final Throwable throwable) {
+ throw new TaskMessageHandlerFailure(throwable);
+ }
+ }
+
+ /**
+ * Calls the configured Task suspend handler and catches exceptions it may throw.
+ */
+ private void suspendTask(final byte[] message) throws TaskSuspendHandlerFailure {
+ try {
+ this.f_suspendHandler.get().onNext(new SuspendEventImpl(message));
+ } catch (final Throwable throwable) {
+ throw new TaskSuspendHandlerFailure(throwable);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStartImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStartImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStartImpl.java
new file mode 100644
index 0000000..5bd598e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStartImpl.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.events.TaskStart;
+
+import javax.inject.Inject;
+
+/**
+ * Injectable implementation of TaskStart
+ */
+final class TaskStartImpl implements TaskStart {
+
+ private final String id;
+
+ @Inject
+ TaskStartImpl(final @Parameter(TaskConfigurationOptions.Identifier.class) String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getId() {
+ return this.id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStatus.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStatus.java
new file mode 100644
index 0000000..2cce16f
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStatus.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.evaluator.context.parameters.ContextIdentifier;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.evaluator.HeartBeatManager;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.TaskMessage;
+import org.apache.reef.task.TaskMessageSource;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Represents the various states a Task could be in.
+ */
+public final class TaskStatus {
+ private static final Logger LOG = Logger.getLogger(TaskStatus.class.getName());
+
+ private final String taskId;
+ private final String contextId;
+ private final HeartBeatManager heartBeatManager;
+ private final Set<TaskMessageSource> evaluatorMessageSources;
+ private final ExceptionCodec exceptionCodec;
+ private Optional<Throwable> lastException = Optional.empty();
+ private Optional<byte[]> result = Optional.empty();
+ private State state = State.PRE_INIT;
+
+
+ @Inject
+ TaskStatus(final @Parameter(TaskConfigurationOptions.Identifier.class) String taskId,
+ final @Parameter(ContextIdentifier.class) String contextId,
+ final @Parameter(TaskConfigurationOptions.TaskMessageSources.class) Set<TaskMessageSource> evaluatorMessageSources,
+ final HeartBeatManager heartBeatManager,
+ final ExceptionCodec exceptionCodec) {
+ this.taskId = taskId;
+ this.contextId = contextId;
+ this.heartBeatManager = heartBeatManager;
+ this.evaluatorMessageSources = evaluatorMessageSources;
+ this.exceptionCodec = exceptionCodec;
+ }
+
+ /**
+ * @param from
+ * @param to
+ * @return true, if the state transition from state 'from' to state 'to' is legal.
+ */
+ private static boolean isLegal(final State from, final State to) {
+ if (from == null) {
+ return to == State.INIT;
+ }
+ switch (from) {
+ case PRE_INIT:
+ switch (to) {
+ case INIT:
+ return true;
+ default:
+ return false;
+ }
+ case INIT:
+ switch (to) {
+ case RUNNING:
+ case FAILED:
+ case KILLED:
+ case DONE:
+ return true;
+ default:
+ return false;
+ }
+ case RUNNING:
+ switch (to) {
+ case CLOSE_REQUESTED:
+ case SUSPEND_REQUESTED:
+ case FAILED:
+ case KILLED:
+ case DONE:
+ return true;
+ default:
+ return false;
+ }
+ case CLOSE_REQUESTED:
+ switch (to) {
+ case FAILED:
+ case KILLED:
+ case DONE:
+ return true;
+ default:
+ return false;
+ }
+ case SUSPEND_REQUESTED:
+ switch (to) {
+ case FAILED:
+ case KILLED:
+ case SUSPENDED:
+ return true;
+ default:
+ return false;
+ }
+
+ case FAILED:
+ case DONE:
+ case KILLED:
+ return false;
+ default:
+ return false;
+ }
+ }
+
+ public final String getTaskId() {
+ return this.taskId;
+ }
+
+ ReefServiceProtos.TaskStatusProto toProto() {
+ this.check();
+ final ReefServiceProtos.TaskStatusProto.Builder result = ReefServiceProtos.TaskStatusProto.newBuilder()
+ .setContextId(this.contextId)
+ .setTaskId(this.taskId)
+ .setState(this.getProtoState());
+
+ if (this.result.isPresent()) {
+ result.setResult(ByteString.copyFrom(this.result.get()));
+ } else if (this.lastException.isPresent()) {
+ final byte[] error = this.exceptionCodec.toBytes(this.lastException.get());
+ result.setResult(ByteString.copyFrom(error));
+ } else if (this.state == State.RUNNING) {
+ for (final TaskMessage taskMessage : this.getMessages()) {
+ result.addTaskMessage(ReefServiceProtos.TaskStatusProto.TaskMessageProto.newBuilder()
+ .setSourceId(taskMessage.getMessageSourceID())
+ .setMessage(ByteString.copyFrom(taskMessage.get()))
+ .build());
+ }
+ }
+
+ return result.build();
+ }
+
+ private void check() {
+ if (this.result.isPresent() && this.lastException.isPresent()) {
+ throw new RuntimeException("Found both an exception and a result. This is unsupported.");
+ }
+ }
+
+ private ReefServiceProtos.State getProtoState() {
+ switch (this.state) {
+ case INIT:
+ return ReefServiceProtos.State.INIT;
+ case CLOSE_REQUESTED:
+ case SUSPEND_REQUESTED:
+ case RUNNING:
+ return ReefServiceProtos.State.RUNNING;
+ case DONE:
+ return ReefServiceProtos.State.DONE;
+ case SUSPENDED:
+ return ReefServiceProtos.State.SUSPEND;
+ case FAILED:
+ return ReefServiceProtos.State.FAILED;
+ case KILLED:
+ return ReefServiceProtos.State.KILLED;
+ }
+ throw new RuntimeException("Unknown state: " + this.state);
+ }
+
+ void setException(final Throwable throwable) {
+ synchronized (this.heartBeatManager) {
+ this.lastException = Optional.of(throwable);
+ this.state = State.FAILED;
+ this.check();
+ this.heartbeat();
+ }
+ }
+
+ void setResult(final byte[] result) {
+ synchronized (this.heartBeatManager) {
+ this.result = Optional.ofNullable(result);
+ if (this.state == State.RUNNING) {
+ this.setState(State.DONE);
+ } else if (this.state == State.SUSPEND_REQUESTED) {
+ this.setState(State.SUSPENDED);
+ } else if (this.state == State.CLOSE_REQUESTED) {
+ this.setState(State.DONE);
+ }
+ this.check();
+ this.heartbeat();
+ }
+ }
+
+ private void heartbeat() {
+ this.heartBeatManager.sendTaskStatus(this.toProto());
+ }
+
+ /**
+ * Sets the state to INIT and informs the driver about it.
+ */
+ void setInit() {
+ LOG.log(Level.FINEST, "Sending Task INIT heartbeat to the Driver.");
+ this.setState(State.INIT);
+ this.heartbeat();
+ }
+
+ /**
+ * Sets the state to RUNNING after the handlers for TaskStart have been called.
+ */
+ void setRunning() {
+ this.setState(State.RUNNING);
+ }
+
+ void setCloseRequested() {
+ this.setState(State.CLOSE_REQUESTED);
+ }
+
+ void setSuspendRequested() {
+ this.setState(State.SUSPEND_REQUESTED);
+ }
+
+ void setKilled() {
+ this.setState(State.KILLED);
+ this.heartbeat();
+ }
+
+ boolean isRunning() {
+ return this.state == State.RUNNING;
+ }
+
+ boolean isNotRunning() {
+ return this.state != State.RUNNING;
+ }
+
+ boolean hasEnded() {
+ switch (this.state) {
+ case DONE:
+ case SUSPENDED:
+ case FAILED:
+ case KILLED:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ State getState() {
+ return this.state;
+ }
+
+ private void setState(final State state) {
+ if (isLegal(this.state, state)) {
+ this.state = state;
+ } else {
+ final String msg = "Illegal state transition from [" + this.state + "] to [" + state + "]";
+ LOG.log(Level.SEVERE, msg);
+ throw new RuntimeException(msg);
+ }
+ }
+
+ String getContextId() {
+ return this.contextId;
+ }
+
+ /**
+ * @return the messages to be sent on the Task's behalf in the next heartbeat.
+ */
+ private final Collection<TaskMessage> getMessages() {
+ final List<TaskMessage> result = new ArrayList<>(this.evaluatorMessageSources.size());
+ for (final TaskMessageSource messageSource : this.evaluatorMessageSources) {
+ final Optional<TaskMessage> taskMessageOptional = messageSource.getMessage();
+ if (taskMessageOptional.isPresent()) {
+ result.add(taskMessageOptional.get());
+ }
+ }
+ return result;
+ }
+
+
+ enum State {
+ PRE_INIT,
+ INIT,
+ RUNNING,
+ CLOSE_REQUESTED,
+ SUSPEND_REQUESTED,
+ SUSPENDED,
+ FAILED,
+ DONE,
+ KILLED
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStopImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStopImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStopImpl.java
new file mode 100644
index 0000000..521d859
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/TaskStopImpl.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
+
+import org.apache.reef.driver.task.TaskConfigurationOptions;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.task.events.TaskStop;
+
+import javax.inject.Inject;
+
+/**
+ * Injectable implementation of TaskStop
+ */
+final class TaskStopImpl implements TaskStop {
+ private final String id;
+
+ @Inject
+ TaskStopImpl(final @Parameter(TaskConfigurationOptions.Identifier.class) String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getId() {
+ return this.id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultCloseHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultCloseHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultCloseHandler.java
new file mode 100644
index 0000000..6920f82
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultCloseHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.defaults;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.task.events.CloseEvent;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Default implementation for EventHandler<CloseEvent>
+ */
+@Private
+public final class DefaultCloseHandler implements EventHandler<CloseEvent> {
+
+ @Inject
+ public DefaultCloseHandler() {
+ }
+
+ @Override
+ public void onNext(final CloseEvent closeEvent) {
+ throw new RuntimeException("No EventHandler<CloseEvent> registered. Event received: " + closeEvent);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultDriverMessageHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultDriverMessageHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultDriverMessageHandler.java
new file mode 100644
index 0000000..f1b4f34
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultDriverMessageHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.defaults;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.task.events.DriverMessage;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * A default implementation of EventHandler<DriverMessage>
+ */
+@Private
+public final class DefaultDriverMessageHandler implements EventHandler<DriverMessage> {
+
+ @Inject
+ public DefaultDriverMessageHandler() {
+ }
+
+ @Override
+ public void onNext(final DriverMessage driverMessage) {
+ throw new RuntimeException("No DriverMessage handler bound. Message received:" + driverMessage);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultSuspendHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultSuspendHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultSuspendHandler.java
new file mode 100644
index 0000000..3b485c6
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/DefaultSuspendHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.defaults;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.task.events.SuspendEvent;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Default handler for SuspendEvent
+ */
+@Private
+public final class DefaultSuspendHandler implements EventHandler<SuspendEvent> {
+
+ @Inject
+ public DefaultSuspendHandler() {
+ }
+
+ @Override
+ public void onNext(final SuspendEvent suspendEvent) {
+ throw new RuntimeException("No handler for SuspendEvent registered. event: " + suspendEvent);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/package-info.java
new file mode 100644
index 0000000..d69a4aa
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/defaults/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Default implementations for the optional task interfaces.
+ */
+package org.apache.reef.runtime.common.evaluator.task.defaults;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskCallFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskCallFailure.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskCallFailure.java
new file mode 100644
index 0000000..9827a01
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskCallFailure.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.exceptions;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * Thrown when a Task.call() throws an exception
+ */
+@EvaluatorSide
+@Private
+public final class TaskCallFailure extends Exception {
+
+ /**
+ * @param cause the exception thrown by the Task.call() method.
+ */
+ public TaskCallFailure(final Throwable cause) {
+ super("Task.call() threw an Exception.", cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskCloseHandlerFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskCloseHandlerFailure.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskCloseHandlerFailure.java
new file mode 100644
index 0000000..bb18044
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskCloseHandlerFailure.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.exceptions;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * Thrown when a Task Close Handler throws an exception
+ */
+@EvaluatorSide
+@Private
+public final class TaskCloseHandlerFailure extends Exception {
+
+ /**
+ * @param cause the exception thrown by the Task.call() method.
+ */
+ public TaskCloseHandlerFailure(final Throwable cause) {
+ super("Task close handler threw an Exception.", cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskMessageHandlerFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskMessageHandlerFailure.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskMessageHandlerFailure.java
new file mode 100644
index 0000000..8ccf1d1
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskMessageHandlerFailure.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.exceptions;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * Thrown when a Task Message Handler throws an exception
+ */
+@EvaluatorSide
+@Private
+public final class TaskMessageHandlerFailure extends Exception {
+
+ /**
+ * the exception thrown by the task message handler's onNext() method.
+ */
+ public TaskMessageHandlerFailure(final Throwable cause) {
+ super("Task message handler threw an Exception.", cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskStartHandlerFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskStartHandlerFailure.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskStartHandlerFailure.java
new file mode 100644
index 0000000..24dcb35
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskStartHandlerFailure.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.exceptions;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.task.events.TaskStart;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * Thrown when a TastStart handler throws an exception
+ */
+@EvaluatorSide
+@Private
+public final class TaskStartHandlerFailure extends Exception {
+
+ /**
+ * @param cause the exception thrown by the start handler
+ */
+ public TaskStartHandlerFailure(final EventHandler<TaskStart> handler, final Throwable cause) {
+ super("EventHandler<TaskStart> `" + handler.toString() + "` threw an Exception in onNext()", cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskStopHandlerFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskStopHandlerFailure.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskStopHandlerFailure.java
new file mode 100644
index 0000000..05d77bc
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskStopHandlerFailure.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.exceptions;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.task.events.TaskStop;
+import org.apache.reef.wake.EventHandler;
+
+/**
+ * Thrown when a TaskStop handler throws an exception
+ */
+@EvaluatorSide
+@Private
+public final class TaskStopHandlerFailure extends Exception {
+
+ /**
+ * @param cause the exception thrown by the stop handler
+ */
+ public TaskStopHandlerFailure(final EventHandler<TaskStop> handler, final Throwable cause) {
+ super("EventHandler<TaskStop> `" + handler.toString() + "` threw an Exception in onNext()", cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskSuspendHandlerFailure.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskSuspendHandlerFailure.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskSuspendHandlerFailure.java
new file mode 100644
index 0000000..31b275e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/exceptions/TaskSuspendHandlerFailure.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.task.exceptions;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * Thrown when a Task Suspend Handler throws an exception
+ */
+@EvaluatorSide
+@Private
+public final class TaskSuspendHandlerFailure extends Exception {
+
+ /**
+ * @param cause the exception thrown by the task suspend handler's onNext() method.
+ */
+ public TaskSuspendHandlerFailure(final Throwable cause) {
+ super("Task suspend handler threw an Exception.", cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/package-info.java
new file mode 100644
index 0000000..1329251
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/task/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Task-related implementation of the Evaluator resourcemanager.
+ */
+package org.apache.reef.runtime.common.evaluator.task;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/ClasspathProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/ClasspathProvider.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/ClasspathProvider.java
new file mode 100644
index 0000000..a1ad04a
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/ClasspathProvider.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.files;
+
+import net.jcip.annotations.Immutable;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Supplies the classpath to REEF for process (Driver, Evaluator) launches.
+ */
+@Immutable
+@RuntimeAuthor
+public final class ClasspathProvider {
+ private final List<String> driverClasspath;
+ private final List<String> evaluatorClasspath;
+
+
+ @Inject
+ ClasspathProvider(final RuntimeClasspathProvider runtimeClasspathProvider,
+ final REEFFileNames reefFileNames) {
+ final List<String> baseClasspath = Arrays.asList(
+ reefFileNames.getLocalFolderPath() + "/*",
+ reefFileNames.getGlobalFolderPath() + "/*");
+
+ // Assemble the driver classpath
+ final List<String> runtimeDriverClasspathPrefix = runtimeClasspathProvider.getDriverClasspathPrefix();
+ final List<String> runtimeDriverClasspathSuffix = runtimeClasspathProvider.getDriverClasspathSuffix();
+ final List<String> driverClasspath = new ArrayList<>(baseClasspath.size() +
+ runtimeDriverClasspathPrefix.size() +
+ runtimeDriverClasspathSuffix.size());
+ driverClasspath.addAll(runtimeDriverClasspathPrefix);
+ driverClasspath.addAll(baseClasspath);
+ driverClasspath.addAll(runtimeDriverClasspathSuffix);
+ this.driverClasspath = Collections.unmodifiableList(driverClasspath);
+
+ // Assemble the evaluator classpath
+ final List<String> runtimeEvaluatorClasspathPrefix = runtimeClasspathProvider.getEvaluatorClasspathPrefix();
+ final List<String> runtimeEvaluatorClasspathSuffix = runtimeClasspathProvider.getEvaluatorClasspathSuffix();
+ final List<String> evaluatorClasspath = new ArrayList<>(runtimeEvaluatorClasspathPrefix.size() +
+ baseClasspath.size() +
+ runtimeEvaluatorClasspathSuffix.size());
+ evaluatorClasspath.addAll(runtimeEvaluatorClasspathPrefix);
+ evaluatorClasspath.addAll(baseClasspath);
+ evaluatorClasspath.addAll(runtimeEvaluatorClasspathSuffix);
+ this.evaluatorClasspath = Collections.unmodifiableList(evaluatorClasspath);
+ }
+
+ /**
+ * @return the classpath to be used for the Driver
+ */
+ public List<String> getDriverClasspath() {
+ return this.driverClasspath;
+ }
+
+ /**
+ * @return the classpath to be used for Evaluators.
+ */
+ public List<String> getEvaluatorClasspath() {
+ return this.evaluatorClasspath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
new file mode 100644
index 0000000..5297158
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/JobJarMaker.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.files;
+
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.annotations.audience.RuntimeAuthor;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.parameters.DeleteTempFiles;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.JARFileMaker;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Utility that takes a JobSubmissionProto and turns it into a Job Submission Jar.
+ */
+@Private
+@RuntimeAuthor
+@ClientSide
+public final class JobJarMaker {
+
+ private static final Logger LOG = Logger.getLogger(JobJarMaker.class.getName());
+
+ private final ConfigurationSerializer configurationSerializer;
+ private final REEFFileNames fileNames;
+ private final boolean deleteTempFilesOnExit;
+
+ @Inject
+ JobJarMaker(final ConfigurationSerializer configurationSerializer,
+ final REEFFileNames fileNames,
+ final @Parameter(DeleteTempFiles.class) boolean deleteTempFilesOnExit) {
+ this.configurationSerializer = configurationSerializer;
+ this.fileNames = fileNames;
+ this.deleteTempFilesOnExit = deleteTempFilesOnExit;
+ }
+
+ public static void copy(final Iterable<ReefServiceProtos.FileResourceProto> files, final File destinationFolder) {
+
+ if (!destinationFolder.exists()) {
+ destinationFolder.mkdirs();
+ }
+
+ for (final ReefServiceProtos.FileResourceProto fileProto : files) {
+ final File sourceFile = toFile(fileProto);
+ final File destinationFile = new File(destinationFolder, fileProto.getName());
+ if (destinationFile.exists()) {
+ LOG.log(Level.FINEST,
+ "Will not add {0} to the job jar because another file with the same name was already added.",
+ sourceFile.getAbsolutePath()
+ );
+ } else {
+ try {
+ java.nio.file.Files.copy(sourceFile.toPath(), destinationFile.toPath());
+ } catch (final IOException e) {
+ final String message = new StringBuilder("Copy of file [")
+ .append(sourceFile.getAbsolutePath())
+ .append("] to [")
+ .append(destinationFile.getAbsolutePath())
+ .append("] failed.")
+ .toString();
+ throw new RuntimeException(message, e);
+ }
+ }
+ }
+ }
+
+ private static File toFile(final ReefServiceProtos.FileResourceProto fileProto) {
+ return new File(fileProto.getPath());
+ }
+
+ public File createJobSubmissionJAR(
+ final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto,
+ final Configuration driverConfiguration) throws IOException {
+
+ // Copy all files to a local job submission folder
+ final File jobSubmissionFolder = makejobSubmissionFolder();
+ LOG.log(Level.FINE, "Staging submission in {0}", jobSubmissionFolder);
+
+ final File localFolder = new File(jobSubmissionFolder, this.fileNames.getLocalFolderName());
+ final File globalFolder = new File(jobSubmissionFolder, this.fileNames.getGlobalFolderName());
+
+ this.copy(jobSubmissionProto.getGlobalFileList(), globalFolder);
+ this.copy(jobSubmissionProto.getLocalFileList(), localFolder);
+
+ // Store the Driver Configuration in the JAR file.
+ this.configurationSerializer.toFile(
+ driverConfiguration, new File(localFolder, this.fileNames.getDriverConfigurationName()));
+
+ // Create a JAR File for the submission
+ final File jarFile = File.createTempFile(this.fileNames.getJobFolderPrefix(), this.fileNames.getJarFileSuffix());
+
+ LOG.log(Level.FINE, "Creating job submission jar file: {0}", jarFile);
+ new JARFileMaker(jarFile).addChildren(jobSubmissionFolder).close();
+
+ if (this.deleteTempFilesOnExit) {
+ LOG.log(Level.FINE,
+ "Deleting the temporary job folder [{0}] and marking the jar file [{1}] for deletion after the JVM exits.",
+ new Object[]{jobSubmissionFolder.getAbsolutePath(), jarFile.getAbsolutePath()});
+ jobSubmissionFolder.delete();
+ jarFile.deleteOnExit();
+ } else {
+ LOG.log(Level.FINE, "Keeping the temporary job folder [{0}] and jar file [{1}] available after job submission.",
+ new Object[]{jobSubmissionFolder.getAbsolutePath(), jarFile.getAbsolutePath()});
+ }
+ return jarFile;
+ }
+
+ private File makejobSubmissionFolder() throws IOException {
+ return Files.createTempDirectory(this.fileNames.getJobFolderPrefix()).toFile();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
new file mode 100644
index 0000000..0fac287
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/REEFFileNames.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.files;
+
+import net.jcip.annotations.Immutable;
+
+import javax.inject.Inject;
+import java.io.File;
+
+/**
+ * Access to the various places things go according to the REEF file system standard.
+ */
+@Immutable
+public final class REEFFileNames {
+
+ private static final String REEF_BASE_FOLDER = "reef";
+ private static final String GLOBAL_FOLDER = "global";
+ static final String GLOBAL_FOLDER_PATH = REEF_BASE_FOLDER + '/' + GLOBAL_FOLDER;
+ private static final String LOCAL_FOLDER = "local";
+ static final String LOCAL_FOLDER_PATH = REEF_BASE_FOLDER + '/' + LOCAL_FOLDER;
+ private static final String DRIVER_CONFIGURATION_NAME = "driver.conf";
+ private static final String DRIVER_CONFIGURATION_PATH =
+ LOCAL_FOLDER_PATH + '/' + DRIVER_CONFIGURATION_NAME;
+ private static final String EVALUATOR_CONFIGURATION_NAME = "evaluator.conf";
+ private static final String EVALUATOR_CONFIGURATION_PATH =
+ LOCAL_FOLDER_PATH + '/' + EVALUATOR_CONFIGURATION_NAME;
+ private static final String JAR_FILE_SUFFIX = ".jar";
+ private static final String JOB_FOLDER_PREFIX = "reef-job-";
+ private static final String EVALUATOR_FOLDER_PREFIX = "reef-evaluator-";
+ private static final String DRIVER_STDERR = "driver.stderr";
+ private static final String DRIVER_STDOUT = "driver.stdout";
+ private static final String EVALUATOR_STDERR = "evaluator.stderr";
+ private static final String EVALUATOR_STDOUT = "evaluator.stdout";
+ private static final String CPP_BRIDGE = "JavaClrBridge";
+ private static final String REEF_GLOBAL = "/reef/global";
+ private static final String REEF_DRIVER_APPDLL_DIR = "/ReefDriverAppDlls/";
+ private static final String TMP_LOAD_DIR = "/reef/CLRLoadingDirectory";
+
+ @Inject
+ public REEFFileNames() {
+ }
+
+ /**
+ * The name of the REEF folder inside of the working directory of an Evaluator or Driver
+ */
+ public String getREEFFolderName() {
+ return REEF_BASE_FOLDER;
+ }
+
+ /**
+ * @return the folder und which all REEF files are stored.
+ */
+ public File getREEFFolder() {
+ return new File(getREEFFolderName());
+ }
+
+
+ /**
+ * @return the name of the folder inside of REEF_BASE_FOLDER that houses the global files.
+ */
+ public String getGlobalFolderName() {
+ return GLOBAL_FOLDER;
+ }
+
+ /**
+ * @return the path to the global folder: REEF_BASE_FOLDER/GLOBAL_FOLDER
+ */
+ public String getGlobalFolderPath() {
+ return GLOBAL_FOLDER_PATH;
+ }
+
+ /**
+ * @return the folder holding the files global to all containers.
+ */
+ public File getGlobalFolder() {
+ return new File(getREEFFolder(), getGlobalFolderName());
+ }
+
+
+ /**
+ * @return the name of the folder inside of REEF_BASE_FOLDER that houses the local files.
+ */
+ public String getLocalFolderName() {
+ return LOCAL_FOLDER;
+ }
+
+ /**
+ * @return the path to the local folder: REEF_BASE_FOLDER/LOCAL_FOLDER
+ */
+ public String getLocalFolderPath() {
+ return LOCAL_FOLDER_PATH;
+ }
+
+ /**
+ * @return the folder holding the files local to this container.
+ */
+ public File getLocalFolder() {
+ return new File(getREEFFolder(), getLocalFolderName());
+ }
+
+
+ /**
+ * The name under which the driver configuration will be stored in REEF_BASE_FOLDER/LOCAL_FOLDER
+ */
+ public String getDriverConfigurationName() {
+ return DRIVER_CONFIGURATION_NAME;
+ }
+
+ /**
+ * @return The path to the driver configuration: GLOBAL_FOLDER/LOCAL_FOLDER/DRIVER_CONFIGURATION_NAME
+ */
+ public String getDriverConfigurationPath() {
+ return DRIVER_CONFIGURATION_PATH;
+ }
+
+ /**
+ * @return The name under which the driver configuration will be stored in REEF_BASE_FOLDER/LOCAL_FOLDER
+ */
+ public String getEvaluatorConfigurationName() {
+ return EVALUATOR_CONFIGURATION_NAME;
+ }
+
+ /**
+ * @return the path to the evaluator configuration.
+ */
+ public String getEvaluatorConfigurationPath() {
+ return EVALUATOR_CONFIGURATION_PATH;
+ }
+
+ /**
+ * @return The suffix used for JAR files, including the "."
+ */
+ public String getJarFileSuffix() {
+ return JAR_FILE_SUFFIX;
+ }
+
+ /**
+ * The prefix used whenever REEF is asked to create a job folder, on (H)DFS or locally.
+ * <p/>
+ * This prefix is also used with JAR files created to represent a job.
+ */
+ public String getJobFolderPrefix() {
+ return JOB_FOLDER_PREFIX;
+ }
+
+ /**
+ * @return The name used within the current working directory of the driver to redirect standard error to.
+ */
+ public String getDriverStderrFileName() {
+ return DRIVER_STDERR;
+ }
+
+ /**
+ * @return The name used within the current working directory of the driver to redirect standard out to.
+ */
+ public String getDriverStdoutFileName() {
+ return DRIVER_STDOUT;
+ }
+
+ /**
+ * @return The prefix used whenever REEF is asked to create an Evaluator folder, e.g. for staging.
+ */
+ public String getEvaluatorFolderPrefix() {
+ return EVALUATOR_FOLDER_PREFIX;
+ }
+
+ /**
+ * @return The name used within the current working directory of the driver to redirect standard error to.
+ */
+ public String getEvaluatorStderrFileName() {
+ return EVALUATOR_STDERR;
+ }
+
+ /**
+ * @return The name used within the current working directory of the driver to redirect standard out to.
+ */
+ public String getEvaluatorStdoutFileName() {
+ return EVALUATOR_STDOUT;
+ }
+
+ /**
+ * @return the name of cpp bridge file
+ */
+ public String getCppBridge() { return CPP_BRIDGE; }
+
+ /**
+ * @return reeg global file folder
+ */
+ public String getReefGlobal() { return REEF_GLOBAL; }
+
+ /**
+ * @return reef driver app dll directory
+ */
+ public String getReefDriverAppDllDir() { return REEF_DRIVER_APPDLL_DIR; }
+
+ /**
+ * @return temp load directory
+ */
+ public String getLoadDir() { return TMP_LOAD_DIR; }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/RuntimeClasspathProvider.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/RuntimeClasspathProvider.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/RuntimeClasspathProvider.java
new file mode 100644
index 0000000..8d753c8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/RuntimeClasspathProvider.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.files;
+
+import java.util.List;
+
+/**
+ * Interface to be implemented by each REEF runtime (YARN, Mesos, Local) to provide additional classpath elements to be
+ * pre- and postfixed to the user classpath.
+ */
+public interface RuntimeClasspathProvider {
+
+ /**
+ * @return the classpath to be prefixed in front of the user classpath of the Driver.
+ */
+ List<String> getDriverClasspathPrefix();
+
+ /**
+ * @return the classpath to be suffixed after of the user classpath of the Driver.
+ */
+ List<String> getDriverClasspathSuffix();
+
+ /**
+ * @return the classpath to be prefixed in front of the user classpath of each Evaluator.
+ */
+ List<String> getEvaluatorClasspathPrefix();
+
+ /**
+ * @return the classpath to be suffixed after of the user classpath of each Evaluator.
+ */
+ List<String> getEvaluatorClasspathSuffix();
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/package-info.java
new file mode 100644
index 0000000..e0bfda0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/files/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * This package will contain the implementation of the REEF file system standard.
+ */
+package org.apache.reef.runtime.common.files;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java
new file mode 100644
index 0000000..7c5334b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/launch/CLRLaunchCommandBuilder.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.launch;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.io.File;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A builder for the command line to launch a CLR Evaluator.
+ */
+public class CLRLaunchCommandBuilder implements LaunchCommandBuilder {
+ private static final Logger LOG = Logger.getLogger(CLRLaunchCommandBuilder.class.getName());
+ private static final String EVALUATOR_PATH = "reef/global/Microsoft.Reef.Evaluator.exe";
+
+
+ private String standardErrPath = null;
+ private String standardOutPath = null;
+ private String errorHandlerRID = null;
+ private String launchID = null;
+ private int megaBytes = 0;
+ private String evaluatorConfigurationPath = null;
+
+ @Override
+ public List<String> build() {
+ final List<String> result = new LinkedList<>();
+ File f = new File(EVALUATOR_PATH);
+ if (!f.exists()) {
+ LOG.log(Level.WARNING, "file can NOT be found: {0}", f.getAbsolutePath());
+ }
+ result.add(f.getPath());
+ result.add(errorHandlerRID);
+ result.add(evaluatorConfigurationPath);
+ if ((null != this.standardOutPath) && (!standardOutPath.isEmpty())) {
+ result.add(">" + this.standardOutPath);
+ }
+ if ((null != this.standardErrPath) && (!standardErrPath.isEmpty())) {
+ result.add("2>" + this.standardErrPath);
+ }
+ LOG.log(Level.FINE, "Launch Exe: {0}", StringUtils.join(result, ' '));
+ return result;
+ }
+
+ @Override
+ public CLRLaunchCommandBuilder setErrorHandlerRID(final String errorHandlerRID) {
+ this.errorHandlerRID = errorHandlerRID;
+ return this;
+ }
+
+ @Override
+ public CLRLaunchCommandBuilder setLaunchID(final String launchID) {
+ this.launchID = launchID;
+ return this;
+ }
+
+ @Override
+ public CLRLaunchCommandBuilder setMemory(final int megaBytes) {
+ this.megaBytes = megaBytes;
+ return this;
+ }
+
+ @Override
+ public CLRLaunchCommandBuilder setConfigurationFileName(final String configurationFileName) {
+ this.evaluatorConfigurationPath = configurationFileName;
+ return this;
+ }
+
+ @Override
+ public CLRLaunchCommandBuilder setStandardOut(final String standardOut) {
+ this.standardOutPath = standardOut;
+ return this;
+ }
+
+ @Override
+ public CLRLaunchCommandBuilder setStandardErr(final String standardErr) {
+ this.standardErrPath = standardErr;
+ return this;
+ }
+}