You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:09 UTC
[36/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/HeartBeatManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/HeartBeatManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/HeartBeatManager.java
new file mode 100644
index 0000000..5dc7b83
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/HeartBeatManager.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator;
+
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.evaluator.context.ContextManager;
+import org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier;
+import org.apache.reef.runtime.common.evaluator.parameters.HeartbeatPeriod;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+public class HeartBeatManager {
+
+ private static final Logger LOG = Logger.getLogger(HeartBeatManager.class.getName());
+
+ private final Clock clock;
+ private final int heartbeatPeriod;
+ private final EventHandler<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatHandler;
+ private final InjectionFuture<EvaluatorRuntime> evaluatorRuntime;
+ private final InjectionFuture<ContextManager> contextManager;
+
+ @Inject
+ private HeartBeatManager(
+ final InjectionFuture<EvaluatorRuntime> evaluatorRuntime,
+ final InjectionFuture<ContextManager> contextManager,
+ final Clock clock,
+ final RemoteManager remoteManager,
+ final @Parameter(HeartbeatPeriod.class) int heartbeatPeriod,
+ final @Parameter(DriverRemoteIdentifier.class) String driverRID) {
+
+ this.evaluatorRuntime = evaluatorRuntime;
+ this.contextManager = contextManager;
+ this.clock = clock;
+ this.heartbeatPeriod = heartbeatPeriod;
+ this.evaluatorHeartbeatHandler = remoteManager.getHandler(
+ driverRID, EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.class);
+ }
+
+ /**
+ * Assemble a complete new heartbeat and send it out.
+ */
+ public synchronized void sendHeartbeat() {
+ this.sendHeartBeat(this.getEvaluatorHeartbeatProto());
+ }
+
+ /**
+ * Called with a specific TaskStatus that must be delivered to the driver.
+ */
+ public synchronized void sendTaskStatus(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+ this.sendHeartBeat(this.getEvaluatorHeartbeatProto(
+ this.evaluatorRuntime.get().getEvaluatorStatus(),
+ this.contextManager.get().getContextStatusCollection(),
+ Optional.of(taskStatusProto)));
+ }
+
+ /**
+ * Called with a specific TaskStatus that must be delivered to the driver.
+ */
+ public synchronized void sendContextStatus(
+ final ReefServiceProtos.ContextStatusProto contextStatusProto) {
+
+ // TODO: Write a test that checks for the order.
+ final Collection<ReefServiceProtos.ContextStatusProto> contextStatusList = new ArrayList<>();
+ contextStatusList.add(contextStatusProto);
+ contextStatusList.addAll(this.contextManager.get().getContextStatusCollection());
+
+ final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto heartbeatProto =
+ this.getEvaluatorHeartbeatProto(
+ this.evaluatorRuntime.get().getEvaluatorStatus(),
+ contextStatusList, Optional.<ReefServiceProtos.TaskStatusProto>empty());
+
+ this.sendHeartBeat(heartbeatProto);
+ }
+
+ /**
+ * Called with a specific EvaluatorStatus that must be delivered to the driver.
+ */
+ public synchronized void sendEvaluatorStatus(
+ final ReefServiceProtos.EvaluatorStatusProto evaluatorStatusProto) {
+ this.sendHeartBeat(EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.newBuilder()
+ .setTimestamp(System.currentTimeMillis())
+ .setEvaluatorStatus(evaluatorStatusProto)
+ .build());
+ }
+
+ /**
+ * Sends the actual heartbeat out and logs it, so desired.
+ *
+ * @param heartbeatProto
+ */
+ private synchronized void sendHeartBeat(
+ final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto heartbeatProto) {
+ if (LOG.isLoggable(Level.FINEST)) {
+ LOG.log(Level.FINEST, "Heartbeat message:\n" + heartbeatProto, new Exception("Stack trace"));
+ }
+ this.evaluatorHeartbeatHandler.onNext(heartbeatProto);
+ }
+
+
+ private EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto getEvaluatorHeartbeatProto() {
+ return this.getEvaluatorHeartbeatProto(
+ this.evaluatorRuntime.get().getEvaluatorStatus(),
+ this.contextManager.get().getContextStatusCollection(),
+ this.contextManager.get().getTaskStatus());
+ }
+
+ private final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto getEvaluatorHeartbeatProto(
+ final ReefServiceProtos.EvaluatorStatusProto evaluatorStatusProto,
+ final Iterable<ReefServiceProtos.ContextStatusProto> contextStatusProtos,
+ final Optional<ReefServiceProtos.TaskStatusProto> taskStatusProto) {
+
+ final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.Builder builder =
+ EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto.newBuilder()
+ .setTimestamp(System.currentTimeMillis())
+ .setEvaluatorStatus(evaluatorStatusProto);
+
+ for (final ReefServiceProtos.ContextStatusProto contextStatusProto : contextStatusProtos) {
+ builder.addContextStatus(contextStatusProto);
+ }
+
+ if (taskStatusProto.isPresent()) {
+ builder.setTaskStatus(taskStatusProto.get());
+ }
+
+ return builder.build();
+ }
+
+ final class HeartbeatAlarmHandler implements EventHandler<Alarm> {
+ @Override
+ public void onNext(final Alarm alarm) {
+ synchronized (HeartBeatManager.this) {
+ if (evaluatorRuntime.get().isRunning()) {
+ HeartBeatManager.this.sendHeartbeat();
+ HeartBeatManager.this.clock.scheduleAlarm(HeartBeatManager.this.heartbeatPeriod, this);
+ } else {
+ LOG.log(Level.FINEST,
+ "Not triggering a heartbeat, because state is: {0}",
+ evaluatorRuntime.get().getState());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/PIDStoreStartHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/PIDStoreStartHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/PIDStoreStartHandler.java
new file mode 100644
index 0000000..ea3d76a
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/PIDStoreStartHandler.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator;
+
+import org.apache.reef.util.OSUtils;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.StartTime;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintWriter;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This Handler writes the Process ID (PID) to a file with a name given in PID_FILE_NAME to the local working directory.
+ */
+public class PIDStoreStartHandler implements EventHandler<StartTime> {
+ public static final String PID_FILE_NAME = "PID.txt";
+ private static final Logger LOG = Logger.getLogger(PIDStoreStartHandler.class.getName());
+
+ @Inject
+ public PIDStoreStartHandler() {
+ }
+
+ @Override
+ public void onNext(final StartTime startTime) {
+ final long pid = OSUtils.getPID();
+ final File outfile = new File(PID_FILE_NAME);
+ LOG.log(Level.FINEST, "Storing pid `" + pid + "` in file " + outfile.getAbsolutePath());
+ try (final PrintWriter p = new PrintWriter((new FileOutputStream(PID_FILE_NAME)))) {
+ p.write(String.valueOf(pid));
+ p.write("\n");
+ } catch (final FileNotFoundException e) {
+ LOG.log(Level.WARNING, "Unable to create PID file.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextClientCodeException.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextClientCodeException.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextClientCodeException.java
new file mode 100644
index 0000000..8755f39
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextClientCodeException.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context;
+
+import org.apache.reef.evaluator.context.parameters.ContextIdentifier;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.Optional;
+
+/**
+ * Thrown when we encounter a problem with client code in a context.
+ */
+public final class ContextClientCodeException extends Exception {
+ private final String contextID;
+ private final Optional<String> parentID;
+
+ /**
+ * @param contextID the ID of the failed context.
+ * @param parentID the ID of the failed context's parent, if any.
+ * @param message the error message.
+ * @param cause the exception that caused the error.
+ */
+ public ContextClientCodeException(final String contextID,
+ final Optional<String> parentID,
+ final String message,
+ final Throwable cause) {
+ super("Failure in context '" + contextID + "': " + message, cause);
+ this.contextID = contextID;
+ this.parentID = parentID;
+ }
+
+ /**
+ * Extracts a context id from the given configuration.
+ *
+ * @param c
+ * @return the context id in the given configuration.
+ * @throws RuntimeException if the configuration can't be parsed.
+ */
+ public static String getIdentifier(final Configuration c) {
+ try {
+ return Tang.Factory.getTang().newInjector(c).getNamedInstance(
+ ContextIdentifier.class);
+ } catch (final InjectionException e) {
+ throw new RuntimeException("Unable to determine context identifier. Giving up.", e);
+ }
+ }
+
+ /**
+ * @return the ID of the failed context
+ */
+ public String getContextID() {
+ return this.contextID;
+ }
+
+ /**
+ * @return the ID of the failed context's parent, if any
+ */
+ public Optional<String> getParentID() {
+ return this.parentID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextLifeCycle.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextLifeCycle.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextLifeCycle.java
new file mode 100644
index 0000000..586ef89
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextLifeCycle.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context;
+
+import org.apache.reef.evaluator.context.ContextMessageSource;
+import org.apache.reef.evaluator.context.events.ContextStart;
+import org.apache.reef.evaluator.context.events.ContextStop;
+import org.apache.reef.evaluator.context.parameters.*;
+import org.apache.reef.runtime.common.utils.BroadCastEventHandler;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+/**
+ * This class is used to trigger all the context life-cycle dependent events.
+ */
+final class ContextLifeCycle {
+
+ private final String identifier;
+ private final Set<EventHandler<ContextStart>> contextStartHandlers;
+ private final Set<EventHandler<ContextStop>> contextStopHandlers;
+ private final Set<ContextMessageSource> contextMessageSources;
+ private final EventHandler<byte[]> contextMessageHandler;
+
+ @Inject
+ ContextLifeCycle(final @Parameter(ContextIdentifier.class) String identifier,
+ final @Parameter(ContextMessageHandlers.class) Set<EventHandler<byte[]>> contextMessageHandlers,
+ final @Parameter(ContextStartHandlers.class) Set<EventHandler<ContextStart>> contextStartHandlers,
+ final @Parameter(ContextStopHandlers.class) Set<EventHandler<ContextStop>> contextStopHandlers,
+ final @Parameter(ContextMessageSources.class) Set<ContextMessageSource> contextMessageSources) {
+ this.identifier = identifier;
+ this.contextStartHandlers = contextStartHandlers;
+ this.contextStopHandlers = contextStopHandlers;
+ this.contextMessageSources = contextMessageSources;
+ this.contextMessageHandler = new BroadCastEventHandler<>(contextMessageHandlers);
+ }
+
+ /**
+ * Fires ContextStart to all registered event handlers.
+ */
+ final void start() {
+ final ContextStart contextStart = new ContextStartImpl(this.identifier);
+ for (final EventHandler<ContextStart> startHandler : this.contextStartHandlers) {
+ startHandler.onNext(contextStart);
+ }
+ }
+
+ /**
+ * Fires ContextStop to all registered event handlers.
+ */
+ final void close() {
+ final ContextStop contextStop = new ContextStopImpl(this.identifier);
+ for (final EventHandler<ContextStop> stopHandler : this.contextStopHandlers) {
+ stopHandler.onNext(contextStop);
+ }
+ }
+
+ /**
+ * Deliver the driver message to the context message handler
+ *
+ * @param message sent by the driver
+ */
+ final void handleContextMessage(final byte[] message) {
+ this.contextMessageHandler.onNext(message);
+ }
+
+ /**
+ * @return (a shallow copy of) the set of ContextMessageSources configured.
+ */
+ final Set<ContextMessageSource> getContextMessageSources() {
+ return Collections.unmodifiableSet(new LinkedHashSet<>(this.contextMessageSources));
+ }
+
+ final String getIdentifier() {
+ return this.identifier;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextManager.java
new file mode 100644
index 0000000..e15e1f8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextManager.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.evaluator.HeartBeatManager;
+import org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Stack;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages the stack of context in the Evaluator.
+ */
+@Private
+@EvaluatorSide
+@Provided
+public final class ContextManager implements AutoCloseable {
+
+ private static final Logger LOG = Logger.getLogger(ContextManager.class.getName());
+
+ /**
+ * The stack of context.
+ */
+ private final Stack<ContextRuntime> contextStack = new Stack<>();
+
+ /**
+ * Used to instantiate the root context.
+ */
+ private final InjectionFuture<RootContextLauncher> launchContext;
+
+ /**
+ * Used for status reporting to the Driver.
+ */
+ private final HeartBeatManager heartBeatManager;
+
+ /**
+ * To serialize Configurations.
+ */
+ private final ConfigurationSerializer configurationSerializer;
+
+ private final ExceptionCodec exceptionCodec;
+
+ /**
+ * @param launchContext to instantiate the root context.
+ * @param heartBeatManager for status reporting to the Driver.
+ * @param configurationSerializer
+ * @param exceptionCodec
+ */
+ @Inject
+ ContextManager(final InjectionFuture<RootContextLauncher> launchContext,
+ final HeartBeatManager heartBeatManager,
+ final ConfigurationSerializer configurationSerializer,
+ final ExceptionCodec exceptionCodec) {
+ this.launchContext = launchContext;
+ this.heartBeatManager = heartBeatManager;
+ this.configurationSerializer = configurationSerializer;
+ this.exceptionCodec = exceptionCodec;
+ }
+
+ /**
+ * Start the context manager. This initiates the root context.
+ *
+ * @throws ContextClientCodeException if the root context can't be instantiated.
+ */
+ public void start() throws ContextClientCodeException {
+ synchronized (this.contextStack) {
+ LOG.log(Level.FINEST, "Instantiating root context.");
+ this.contextStack.push(this.launchContext.get().getRootContext());
+
+ if (this.launchContext.get().getInitialTaskConfiguration().isPresent()) {
+ LOG.log(Level.FINEST, "Launching the initial Task");
+ try {
+ this.contextStack.peek().startTask(this.launchContext.get().getInitialTaskConfiguration().get());
+ } catch (final TaskClientCodeException e) {
+ this.handleTaskException(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Shuts down. This forecefully kills the Task if there is one and then shuts down all Contexts on the stack,
+ * starting at the top.
+ */
+ @Override
+ public void close() {
+ synchronized (this.contextStack) {
+ if (!this.contextStackIsEmpty()) {
+ this.contextStack.lastElement().close();
+ }
+ }
+ }
+
+ /**
+ * @return true if there is no context.
+ */
+ public boolean contextStackIsEmpty() {
+ synchronized (this.contextStack) {
+ return this.contextStack.isEmpty();
+ }
+ }
+
+ /**
+ * Processes the given ContextControlProto to launch / close / suspend Tasks and Contexts.
+ * <p/>
+ * This also triggers the HeartBeatManager to send a heartbeat with the result of this operation.
+ *
+ * @param controlMessage the message to process
+ */
+ public void handleContextControlProtocol(
+ final EvaluatorRuntimeProtocol.ContextControlProto controlMessage) {
+
+ synchronized (this.heartBeatManager) {
+ try {
+ if (controlMessage.hasAddContext() && controlMessage.hasRemoveContext()) {
+ throw new IllegalArgumentException(
+ "Received a message with both add and remove context. This is unsupported.");
+ }
+
+ final byte[] message = controlMessage.hasTaskMessage() ?
+ controlMessage.getTaskMessage().toByteArray() : null;
+
+ if (controlMessage.hasAddContext()) {
+ this.addContext(controlMessage.getAddContext());
+ if (controlMessage.hasStartTask()) {
+ // We support submitContextAndTask()
+ this.startTask(controlMessage.getStartTask());
+ } else {
+ // We need to trigger a heartbeat here.
+ // In other cases, the heartbeat will be triggered by the TaskRuntime
+ // Therefore this call can not go into addContext.
+ this.heartBeatManager.sendHeartbeat();
+ }
+ } else if (controlMessage.hasRemoveContext()) {
+ this.removeContext(controlMessage.getRemoveContext().getContextId());
+ } else if (controlMessage.hasStartTask()) {
+ this.startTask(controlMessage.getStartTask());
+ } else if (controlMessage.hasStopTask()) {
+ this.contextStack.peek().closeTask(message);
+ } else if (controlMessage.hasSuspendTask()) {
+ this.contextStack.peek().suspendTask(message);
+ } else if (controlMessage.hasTaskMessage()) {
+ this.contextStack.peek().deliverTaskMessage(message);
+ } else if (controlMessage.hasContextMessage()) {
+ final EvaluatorRuntimeProtocol.ContextMessageProto contextMessageProto = controlMessage.getContextMessage();
+ boolean deliveredMessage = false;
+ for (final ContextRuntime context : this.contextStack) {
+ if (context.getIdentifier().equals(contextMessageProto.getContextId())) {
+ context.handleContextMessage(contextMessageProto.getMessage().toByteArray());
+ deliveredMessage = true;
+ break;
+ }
+ }
+ if (!deliveredMessage) {
+ throw new IllegalStateException(
+ "Sent message to unknown context " + contextMessageProto.getContextId());
+ }
+ } else {
+ throw new RuntimeException("Unknown task control message: " + controlMessage);
+ }
+ } catch (final TaskClientCodeException e) {
+ this.handleTaskException(e);
+ } catch (final ContextClientCodeException e) {
+ this.handleContextException(e);
+ }
+ }
+
+ }
+
+ /**
+ * @return the TaskStatusProto of the currently running task, if there is any
+ */
+ public Optional<ReefServiceProtos.TaskStatusProto> getTaskStatus() {
+ synchronized (this.contextStack) {
+ if (this.contextStack.isEmpty()) {
+ throw new RuntimeException(
+ "Asked for a Task status while there isn't even a context running.");
+ }
+ return this.contextStack.peek().getTaskStatus();
+ }
+ }
+
+ /**
+ * @return the status of all context in the stack.
+ */
+ public Collection<ReefServiceProtos.ContextStatusProto> getContextStatusCollection() {
+ synchronized (this.contextStack) {
+ final List<ReefServiceProtos.ContextStatusProto> result = new ArrayList<>(this.contextStack.size());
+ for (final ContextRuntime contextRuntime : this.contextStack) {
+ final ReefServiceProtos.ContextStatusProto contextStatusProto = contextRuntime.getContextStatus();
+ LOG.log(Level.FINEST, "Add context status: {0}", contextStatusProto);
+ result.add(contextStatusProto);
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Add a context to the stack.
+ *
+ * @param addContextProto
+ * @throws ContextClientCodeException if there is a client code related issue.
+ */
+ private void addContext(
+ final EvaluatorRuntimeProtocol.AddContextProto addContextProto)
+ throws ContextClientCodeException {
+
+ synchronized (this.contextStack) {
+ try {
+
+ final ContextRuntime currentTopContext = this.contextStack.peek();
+
+ if (!currentTopContext.getIdentifier().equals(addContextProto.getParentContextId())) {
+ throw new IllegalStateException("Trying to instantiate a child context on context with id `" +
+ addContextProto.getParentContextId() + "` while the current top context id is `" +
+ currentTopContext.getIdentifier() + "`");
+ }
+
+ final Configuration contextConfiguration =
+ this.configurationSerializer.fromString(addContextProto.getContextConfiguration());
+
+ final ContextRuntime newTopContext;
+ if (addContextProto.hasServiceConfiguration()) {
+ newTopContext = currentTopContext.spawnChildContext(contextConfiguration,
+ this.configurationSerializer.fromString(addContextProto.getServiceConfiguration()));
+ } else {
+ newTopContext = currentTopContext.spawnChildContext(contextConfiguration);
+ }
+
+ this.contextStack.push(newTopContext);
+
+ } catch (final IOException | BindException e) {
+ throw new RuntimeException("Unable to read configuration.", e);
+ }
+
+ }
+ }
+
+ /**
+ * Remove the context with the given ID from the stack.
+ *
+ * @throws IllegalStateException if the given ID does not refer to the top of stack.
+ */
+ private void removeContext(final String contextID) {
+
+ synchronized (this.contextStack) {
+
+ if (!contextID.equals(this.contextStack.peek().getIdentifier())) {
+ throw new IllegalStateException("Trying to close context with id `" + contextID +
+ "`. But the top context has id `" +
+ this.contextStack.peek().getIdentifier() + "`");
+ }
+
+ this.contextStack.peek().close();
+ if (this.contextStack.size() > 1) {
+ /* We did not close the root context. Therefore, we need to inform the
+ * driver explicitly that this context is closed. The root context notification
+ * is implicit in the Evaluator close/done notification.
+ */
+ this.heartBeatManager.sendHeartbeat(); // Ensure Driver gets notified of context DONE state
+ }
+ this.contextStack.pop();
+ System.gc(); // TODO sure??
+ }
+ }
+
+ /**
+ * Launch a Task.
+ */
+ private void startTask(
+ final EvaluatorRuntimeProtocol.StartTaskProto startTaskProto) throws TaskClientCodeException {
+
+ synchronized (this.contextStack) {
+
+ final ContextRuntime currentActiveContext = this.contextStack.peek();
+
+ final String expectedContextId = startTaskProto.getContextId();
+ if (!expectedContextId.equals(currentActiveContext.getIdentifier())) {
+ throw new IllegalStateException("Task expected context `" + expectedContextId +
+ "` but the active context has ID `" + currentActiveContext.getIdentifier() + "`");
+ }
+
+ try {
+ final Configuration taskConfig =
+ this.configurationSerializer.fromString(startTaskProto.getConfiguration());
+ currentActiveContext.startTask(taskConfig);
+ } catch (IOException | BindException e) {
+ throw new RuntimeException("Unable to read configuration.", e);
+ }
+ }
+ }
+
+ /**
+ * THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager
+ */
+ private void handleTaskException(final TaskClientCodeException e) {
+
+ LOG.log(Level.SEVERE, "TaskClientCodeException", e);
+
+ final ByteString exception = ByteString.copyFrom(this.exceptionCodec.toBytes(e.getCause()));
+
+ final ReefServiceProtos.TaskStatusProto taskStatus =
+ ReefServiceProtos.TaskStatusProto.newBuilder()
+ .setContextId(e.getContextId())
+ .setTaskId(e.getTaskId())
+ .setResult(exception)
+ .setState(ReefServiceProtos.State.FAILED)
+ .build();
+
+ LOG.log(Level.SEVERE, "Sending heartbeat: {0}", taskStatus);
+
+ this.heartBeatManager.sendTaskStatus(taskStatus);
+ }
+
+ /**
+ * THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager
+ */
+ private void handleContextException(final ContextClientCodeException e) {
+
+ LOG.log(Level.SEVERE, "ContextClientCodeException", e);
+
+ final ByteString exception = ByteString.copyFrom(this.exceptionCodec.toBytes(e.getCause()));
+
+ final ReefServiceProtos.ContextStatusProto.Builder contextStatusBuilder =
+ ReefServiceProtos.ContextStatusProto.newBuilder()
+ .setContextId(e.getContextID())
+ .setContextState(ReefServiceProtos.ContextStatusProto.State.FAIL)
+ .setError(exception);
+
+ if (e.getParentID().isPresent()) {
+ contextStatusBuilder.setParentId(e.getParentID().get());
+ }
+
+ final ReefServiceProtos.ContextStatusProto contextStatus = contextStatusBuilder.build();
+
+ LOG.log(Level.SEVERE, "Sending heartbeat: {0}", contextStatus);
+
+ this.heartBeatManager.sendContextStatus(contextStatus);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextRuntime.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextRuntime.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextRuntime.java
new file mode 100644
index 0000000..044f5c4
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextRuntime.java
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.evaluator.context.ContextMessage;
+import org.apache.reef.evaluator.context.ContextMessageSource;
+import org.apache.reef.evaluator.context.parameters.Services;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException;
+import org.apache.reef.runtime.common.evaluator.task.TaskRuntime;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.util.Optional;
+
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The evaluator side resourcemanager for Contexts.
+ */
+@Provided
+@Private
+@EvaluatorSide
+public final class ContextRuntime {
+
+ private static final Logger LOG = Logger.getLogger(ContextRuntime.class.getName());
+
+ /**
+ * Context-local injector. This contains information that will not be available in child injectors.
+ */
+ private final Injector contextInjector;
+
+ /**
+ * Service injector. State in this injector moves to child injectors.
+ */
+ private final Injector serviceInjector;
+
+ /**
+ * Convenience class to hold all the event handlers for the context as well as the service instances.
+ */
+ private final ContextLifeCycle contextLifeCycle;
+ /**
+ * The parent context, if there is any.
+ */
+ private final Optional<ContextRuntime> parentContext; // guarded by this
+ /**
+ * The child context, if there is any.
+ */
+ private Optional<ContextRuntime> childContext = Optional.empty(); // guarded by this
+ /**
+ * The currently running task, if there is any.
+ */
+ private Optional<TaskRuntime> task = Optional.empty(); // guarded by this
+
+ private Thread taskRuntimeThread = null;
+
+ // TODO: Which lock guards this?
+ private ReefServiceProtos.ContextStatusProto.State contextState =
+ ReefServiceProtos.ContextStatusProto.State.READY;
+
+ /**
+ * Create a new ContextRuntime.
+ *
+ * @param serviceInjector the serviceInjector to be used.
+ * @param contextConfiguration the Configuration for this context.
+ * @throws ContextClientCodeException if the context cannot be instantiated.
+ */
+ ContextRuntime(final Injector serviceInjector, final Configuration contextConfiguration,
+ final Optional<ContextRuntime> parentContext) throws ContextClientCodeException {
+
+ this.serviceInjector = serviceInjector;
+ this.parentContext = parentContext;
+
+ // Trigger the instantiation of the services
+ try {
+
+ final Set<Object> services = serviceInjector.getNamedInstance(Services.class);
+ this.contextInjector = serviceInjector.forkInjector(contextConfiguration);
+
+ this.contextLifeCycle = this.contextInjector.getInstance(ContextLifeCycle.class);
+
+ } catch (BindException | InjectionException e) {
+
+ final Optional<String> parentID = this.getParentContext().isPresent() ?
+ Optional.of(this.getParentContext().get().getIdentifier()) :
+ Optional.<String>empty();
+
+ throw new ContextClientCodeException(
+ ContextClientCodeException.getIdentifier(contextConfiguration),
+ parentID, "Unable to spawn context", e);
+ }
+
+ // Trigger the context start events on contextInjector.
+ this.contextLifeCycle.start();
+ }
+
+ /**
+ * Create a new ContextRuntime for the root context.
+ *
+ * @param serviceInjector the serviceInjector to be used.
+ * @param contextConfiguration the Configuration for this context.
+ * @throws ContextClientCodeException if the context cannot be instantiated.
+ */
+ ContextRuntime(final Injector serviceInjector,
+ final Configuration contextConfiguration) throws ContextClientCodeException {
+ this(serviceInjector, contextConfiguration, Optional.<ContextRuntime>empty());
+ LOG.log(Level.FINEST, "Instantiating root context");
+ }
+
+
+ /**
+ * Spawns a new context.
+ * <p/>
+ * The new context will have a serviceInjector that is created by forking the one in this object with the given
+ * serviceConfiguration. The contextConfiguration is used to fork the contextInjector from that new serviceInjector.
+ *
+ * @param contextConfiguration the new context's context (local) Configuration.
+ * @param serviceConfiguration the new context's service Configuration.
+ * @return a child context.
+ * @throws ContextClientCodeException If the context can't be instantiate due to user code / configuration issues
+ * @throws IllegalStateException If this method is called when there is either a task or child context already
+ * present.
+ */
+ ContextRuntime spawnChildContext(
+ final Configuration contextConfiguration,
+ final Configuration serviceConfiguration) throws ContextClientCodeException {
+
+ synchronized (this.contextLifeCycle) {
+
+ if (this.task.isPresent()) {
+ throw new IllegalStateException(
+ "Attempting to spawn a child context when a Task with id '" +
+ this.task.get().getId() + "' is running.");
+ }
+
+ if (this.childContext.isPresent()) {
+ throw new IllegalStateException(
+ "Attempting to instantiate a child context on a context that is not the topmost active context");
+ }
+
+ try {
+
+ final Injector childServiceInjector =
+ this.serviceInjector.forkInjector(serviceConfiguration);
+
+ final ContextRuntime childContext =
+ new ContextRuntime(childServiceInjector, contextConfiguration, Optional.of(this));
+
+ this.childContext = Optional.of(childContext);
+ return childContext;
+
+ } catch (final BindException e) {
+
+ final Optional<String> parentID = this.getParentContext().isPresent() ?
+ Optional.of(this.getParentContext().get().getIdentifier()) :
+ Optional.<String>empty();
+
+ throw new ContextClientCodeException(
+ ContextClientCodeException.getIdentifier(contextConfiguration),
+ parentID, "Unable to spawn context", e);
+ }
+ }
+ }
+
+ /**
+ * Spawns a new context without services of its own.
+ * <p/>
+ * The new context will have a serviceInjector that is created by forking the one in this object. The
+ * contextConfiguration is used to fork the contextInjector from that new serviceInjector.
+ *
+ * @param contextConfiguration the new context's context (local) Configuration.
+ * @return a child context.
+ * @throws ContextClientCodeException If the context can't be instantiate due to user code / configuration issues.
+ * @throws IllegalStateException If this method is called when there is either a task or child context already
+ * present.
+ */
+ ContextRuntime spawnChildContext(
+ final Configuration contextConfiguration) throws ContextClientCodeException {
+
+ synchronized (this.contextLifeCycle) {
+
+ if (this.task.isPresent()) {
+ throw new IllegalStateException(
+ "Attempting to to spawn a child context while a Task with id '" +
+ this.task.get().getId() + "' is running.");
+ }
+
+ if (this.childContext.isPresent()) {
+ throw new IllegalStateException(
+ "Attempting to spawn a child context on a context that is not the topmost active context");
+ }
+
+ final Injector childServiceInjector = this.serviceInjector.forkInjector();
+ final ContextRuntime childContext =
+ new ContextRuntime(childServiceInjector, contextConfiguration, Optional.of(this));
+
+ this.childContext = Optional.of(childContext);
+ return childContext;
+ }
+ }
+
+ /**
+ * Launches a Task on this context.
+ *
+ * @param taskConfig the configuration to be used for the task.
+ * @throws org.apache.reef.runtime.common.evaluator.task.TaskClientCodeException If the Task cannot be instantiated due to user code / configuration issues.
+ * @throws IllegalStateException If this method is called when there is either a task or child context already present.
+ */
+ void startTask(final Configuration taskConfig) throws TaskClientCodeException {
+
+ synchronized (this.contextLifeCycle) {
+
+ if (this.task.isPresent() && this.task.get().hasEnded()) {
+ // clean up state
+ this.task = Optional.empty();
+ }
+
+ if (this.task.isPresent()) {
+ throw new IllegalStateException("Attempting to start a Task when a Task with id '" +
+ this.task.get().getId() + "' is running.");
+ }
+
+ if (this.childContext.isPresent()) {
+ throw new IllegalStateException(
+ "Attempting to start a Task on a context that is not the topmost active context");
+ }
+
+ try {
+ final Injector taskInjector = this.contextInjector.forkInjector(taskConfig);
+ final TaskRuntime taskRuntime = taskInjector.getInstance(TaskRuntime.class);
+ taskRuntime.initialize();
+ this.taskRuntimeThread = new Thread(taskRuntime, taskRuntime.getId());
+ this.taskRuntimeThread.start();
+ this.task = Optional.of(taskRuntime);
+ LOG.log(Level.FINEST, "Started task: {0}", taskRuntime.getTaskId());
+ } catch (final BindException | InjectionException e) {
+ throw new TaskClientCodeException(TaskClientCodeException.getTaskId(taskConfig),
+ this.getIdentifier(),
+ "Unable to instantiate the new task", e);
+ } catch (final Throwable t) {
+ throw new TaskClientCodeException(TaskClientCodeException.getTaskId(taskConfig),
+ this.getIdentifier(),
+ "Unable to start the new task", t);
+ }
+ }
+ }
+
+ /**
+ * Close this context. If there is a child context, this recursively closes it before closing this context. If
+ * there is a Task currently running, that will be closed.
+ */
+ final void close() {
+
+ synchronized (this.contextLifeCycle) {
+
+ this.contextState = ReefServiceProtos.ContextStatusProto.State.DONE;
+
+ if (this.task.isPresent()) {
+ LOG.log(Level.WARNING, "Shutting down a task because the underlying context is being closed.");
+ this.task.get().close(null);
+ }
+
+ if (this.childContext.isPresent()) {
+ LOG.log(Level.WARNING, "Closing a context because its parent context is being closed.");
+ this.childContext.get().close();
+ }
+
+ this.contextLifeCycle.close();
+
+ if (this.parentContext.isPresent()) {
+ this.parentContext.get().resetChildContext();
+ }
+ }
+ }
+
+ /**
+ * @return the parent context, if there is one.
+ */
+ Optional<ContextRuntime> getParentContext() {
+ return this.parentContext;
+ }
+
+ /**
+ * Deliver the given message to the Task.
+ * <p/>
+ * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
+ * in the log.
+ *
+ * @param message the suspend message to deliver or null if there is none.
+ */
+ void suspendTask(final byte[] message) {
+ synchronized (this.contextLifeCycle) {
+ if (!this.task.isPresent()) {
+ LOG.log(Level.WARNING, "Received a suspend task while there was no task running. Ignoring.");
+ } else {
+ this.task.get().suspend(message);
+ }
+ }
+ }
+
+ /**
+ * Issue a close call to the Task
+ * <p/>
+ * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
+ * in the log.
+ *
+ * @param message the close message to deliver or null if there is none.
+ */
+ void closeTask(final byte[] message) {
+ synchronized (this.contextLifeCycle) {
+ if (!this.task.isPresent()) {
+ LOG.log(Level.WARNING, "Received a close task while there was no task running. Ignoring.");
+ } else {
+ this.task.get().close(message);
+ }
+ }
+ }
+
+ /**
+ * Deliver a message to the Task
+ * <p/>
+ * Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING
+ * in the log.
+ *
+ * @param message the close message to deliver or null if there is none.
+ */
+ void deliverTaskMessage(final byte[] message) {
+ synchronized (this.contextLifeCycle) {
+ if (!this.task.isPresent()) {
+ LOG.log(Level.WARNING, "Received a task message while there was no task running. Ignoring.");
+ } else {
+ this.task.get().deliver(message);
+ }
+ }
+ }
+
+ /**
+ * @return the identifier of this context.
+ */
+ String getIdentifier() {
+ return this.contextLifeCycle.getIdentifier();
+ }
+
+ /**
+ * Handle the context message.
+ *
+ * @param message sent by the driver
+ */
+ final void handleContextMessage(final byte[] message) {
+ this.contextLifeCycle.handleContextMessage(message);
+ }
+
+ /**
+ * @return the state of the running Task, if one is running.
+ */
+ Optional<ReefServiceProtos.TaskStatusProto> getTaskStatus() {
+ synchronized (this.contextLifeCycle) {
+ if (this.task.isPresent()) {
+ if (this.task.get().hasEnded()) {
+ this.task = Optional.empty();
+ return Optional.empty();
+ } else {
+ return Optional.of(this.task.get().getStatusProto());
+ }
+ } else {
+ return Optional.empty();
+ }
+ }
+ }
+
+ /**
+ * Called by the child context when it has been closed.
+ */
+ private void resetChildContext() {
+ synchronized (this.contextLifeCycle) {
+ if (this.childContext.isPresent()) {
+ this.childContext = Optional.empty();
+ } else {
+ throw new IllegalStateException("no child context set");
+ }
+ }
+ }
+
+ /**
+ * @return this context's status in protocol buffer form.
+ */
+ ReefServiceProtos.ContextStatusProto getContextStatus() {
+
+ synchronized (this.contextLifeCycle) {
+
+ final ReefServiceProtos.ContextStatusProto.Builder builder =
+ ReefServiceProtos.ContextStatusProto.newBuilder()
+ .setContextId(this.getIdentifier())
+ .setContextState(this.contextState);
+
+ if (this.parentContext.isPresent()) {
+ builder.setParentId(this.parentContext.get().getIdentifier());
+ }
+
+ for (final ContextMessageSource contextMessageSource : this.contextLifeCycle.getContextMessageSources()) {
+ final Optional<ContextMessage> contextMessageOptional = contextMessageSource.getMessage();
+ if (contextMessageOptional.isPresent()) {
+ builder.addContextMessage(ReefServiceProtos.ContextStatusProto.ContextMessageProto.newBuilder()
+ .setSourceId(contextMessageOptional.get().getMessageSourceID())
+ .setMessage(ByteString.copyFrom(contextMessageOptional.get().get()))
+ .build());
+ }
+ }
+
+ return builder.build();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextStartImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextStartImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextStartImpl.java
new file mode 100644
index 0000000..8ca4349
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextStartImpl.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context;
+
+
+import org.apache.reef.evaluator.context.events.ContextStart;
+
+final class ContextStartImpl implements ContextStart {
+
+ private final String identifier;
+
+
+ ContextStartImpl(final String identifier) {
+ this.identifier = identifier;
+ }
+
+ @Override
+ public String getId() {
+ return this.identifier;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextStopImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextStopImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextStopImpl.java
new file mode 100644
index 0000000..74e1b10
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/ContextStopImpl.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context;
+
+import org.apache.reef.evaluator.context.events.ContextStop;
+
+final class ContextStopImpl implements ContextStop {
+
+ private final String identifier;
+
+ ContextStopImpl(final String identifier) {
+ this.identifier = identifier;
+ }
+
+ @Override
+ public String getId() {
+ return this.identifier;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/RootContextLauncher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/RootContextLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/RootContextLauncher.java
new file mode 100644
index 0000000..448949c
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/RootContextLauncher.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context;
+
+import org.apache.reef.runtime.common.evaluator.parameters.InitialTaskConfiguration;
+import org.apache.reef.runtime.common.evaluator.parameters.RootContextConfiguration;
+import org.apache.reef.runtime.common.evaluator.parameters.RootServiceConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+import java.io.IOException;
+
+/**
+ * Helper class that encapsulates the root context configuration: With or without services and an initial task.
+ */
+final class RootContextLauncher {
+
+ private final Injector injector;
+ private final Configuration rootContextConfiguration;
+ private final Optional<Configuration> rootServiceConfiguration;
+ private final Optional<Configuration> initialTaskConfiguration;
+ private final ConfigurationSerializer configurationSerializer;
+ private ContextRuntime rootContext = null;
+
+ @Inject
+ RootContextLauncher(final @Parameter(RootContextConfiguration.class) String rootContextConfiguration,
+ final @Parameter(RootServiceConfiguration.class) String rootServiceConfiguration,
+ final @Parameter(InitialTaskConfiguration.class) String initialTaskConfiguration,
+ final Injector injector, final ConfigurationSerializer configurationSerializer) throws IOException, BindException {
+ this.injector = injector;
+ this.configurationSerializer = configurationSerializer;
+ this.rootContextConfiguration = this.configurationSerializer.fromString(rootContextConfiguration);
+ this.rootServiceConfiguration = Optional.of(this.configurationSerializer.fromString(rootServiceConfiguration));
+ this.initialTaskConfiguration = Optional.of(this.configurationSerializer.fromString(initialTaskConfiguration));
+ }
+
+ @Inject
+ RootContextLauncher(final @Parameter(RootContextConfiguration.class) String rootContextConfiguration,
+ final Injector injector,
+ final @Parameter(RootServiceConfiguration.class) String rootServiceConfiguration, final ConfigurationSerializer configurationSerializer) throws IOException, BindException {
+ this.injector = injector;
+ this.configurationSerializer = configurationSerializer;
+ this.rootContextConfiguration = this.configurationSerializer.fromString(rootContextConfiguration);
+ this.rootServiceConfiguration = Optional.of(this.configurationSerializer.fromString(rootServiceConfiguration));
+ this.initialTaskConfiguration = Optional.empty();
+ }
+
+ @Inject
+ RootContextLauncher(final Injector injector,
+ final @Parameter(RootContextConfiguration.class) String rootContextConfiguration,
+ final @Parameter(InitialTaskConfiguration.class) String initialTaskConfiguration, final ConfigurationSerializer configurationSerializer) throws IOException, BindException {
+ this.injector = injector;
+ this.configurationSerializer = configurationSerializer;
+ this.rootContextConfiguration = this.configurationSerializer.fromString(rootContextConfiguration);
+ this.rootServiceConfiguration = Optional.empty();
+ this.initialTaskConfiguration = Optional.of(this.configurationSerializer.fromString(initialTaskConfiguration));
+ }
+
+ @Inject
+ RootContextLauncher(final @Parameter(RootContextConfiguration.class) String rootContextConfiguration,
+ final Injector injector, final ConfigurationSerializer configurationSerializer) throws IOException, BindException {
+ this.injector = injector;
+ this.configurationSerializer = configurationSerializer;
+ this.rootContextConfiguration = this.configurationSerializer.fromString(rootContextConfiguration);
+ this.rootServiceConfiguration = Optional.empty();
+ this.initialTaskConfiguration = Optional.empty();
+ }
+
+ /**
+ * Instantiates the root context.
+ * <p/>
+ * This also launches the initial task if there is any.
+ *
+ * @param injector
+ * @param rootContextConfiguration
+ * @param rootServiceConfiguration
+ * @return ContextRuntime
+ * @throws ContextClientCodeException
+ */
+ private static ContextRuntime getRootContext(final Injector injector,
+ final Configuration rootContextConfiguration,
+ final Optional<Configuration> rootServiceConfiguration)
+ throws ContextClientCodeException {
+ final ContextRuntime result;
+ if (rootServiceConfiguration.isPresent()) {
+ final Injector rootServiceInjector;
+ try {
+ rootServiceInjector = injector.forkInjector(rootServiceConfiguration.get());
+ } catch (final BindException e) {
+ throw new ContextClientCodeException(ContextClientCodeException.getIdentifier(rootContextConfiguration),
+ Optional.<String>empty(), "Unable to instatiate the root context", e);
+ }
+ result = new ContextRuntime(rootServiceInjector, rootContextConfiguration);
+ } else {
+ result = new ContextRuntime(injector.forkInjector(), rootContextConfiguration);
+ }
+ return result;
+ }
+
+ /**
+ * @return the root context for this evaluator.
+ */
+ final ContextRuntime getRootContext() throws ContextClientCodeException {
+ if (null == this.rootContext) {
+ this.rootContext = getRootContext(this.injector, this.rootContextConfiguration, this.rootServiceConfiguration);
+ }
+ return this.rootContext;
+ }
+
+ final Optional<Configuration> getInitialTaskConfiguration() {
+ return this.initialTaskConfiguration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextMessageHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextMessageHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextMessageHandler.java
new file mode 100644
index 0000000..0007c00
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextMessageHandler.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context.defaults;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.evaluator.context.ContextMessageHandler;
+import org.apache.reef.evaluator.context.parameters.ContextIdentifier;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+/**
+ * Default handler for messages sent by the driver: Crash the context.
+ */
+@EvaluatorSide
+public final class DefaultContextMessageHandler implements ContextMessageHandler {
+
+ private final String contextID;
+
+ @Inject
+ DefaultContextMessageHandler(final @Parameter(ContextIdentifier.class) String contextID) {
+ this.contextID = contextID;
+ }
+
+ @Override
+ public void onNext(final byte[] message) {
+ throw new IllegalStateException("No message handlers given for context " + this.contextID);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextMessageSource.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextMessageSource.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextMessageSource.java
new file mode 100644
index 0000000..ed7a7af
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextMessageSource.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context.defaults;
+
+import org.apache.reef.annotations.Provided;
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.evaluator.context.ContextMessage;
+import org.apache.reef.evaluator.context.ContextMessageSource;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+
+/**
+ * Default ContextMessageSource: return nothing.
+ */
+@EvaluatorSide
+@Provided
+public final class DefaultContextMessageSource implements ContextMessageSource {
+
+ @Inject
+ public DefaultContextMessageSource() {
+ }
+
+ @Override
+ public Optional<ContextMessage> getMessage() {
+ return Optional.empty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextStartHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextStartHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextStartHandler.java
new file mode 100644
index 0000000..a3fbce5
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextStartHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context.defaults;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.evaluator.context.events.ContextStart;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default handler for ContextStart
+ */
+@EvaluatorSide
+public final class DefaultContextStartHandler implements EventHandler<ContextStart> {
+
+ @Inject
+ DefaultContextStartHandler() {
+ }
+
+ @Override
+ public void onNext(final ContextStart contextStart) {
+ Logger.getLogger(this.getClass().toString()).log(Level.INFO, "DefaultContextStartHandler received: " + contextStart);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextStopHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextStopHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextStopHandler.java
new file mode 100644
index 0000000..4031a4b
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/DefaultContextStopHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.context.defaults;
+
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.evaluator.context.events.ContextStop;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for ContextStop
+ */
+@EvaluatorSide
+public final class DefaultContextStopHandler implements EventHandler<ContextStop> {
+
+ @Inject
+ DefaultContextStopHandler() {
+ }
+
+ @Override
+ public void onNext(final ContextStop contextStop) {
+ Logger.getLogger(this.getClass().toString()).log(Level.INFO, "DefaultContextStopHandler received: " + contextStop);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/package-info.java
new file mode 100644
index 0000000..5e572c7
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/defaults/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Default implementations for the optional context interfaces.
+ */
+package org.apache.reef.runtime.common.evaluator.context.defaults;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/package-info.java
new file mode 100644
index 0000000..f7d4390
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/context/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Context implementation of the Evaluator resourcemanager.
+ */
+package org.apache.reef.runtime.common.evaluator.context;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/package-info.java
new file mode 100644
index 0000000..78acce3
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Evaluator-side implementation of the REEF API.
+ */
+package org.apache.reef.runtime.common.evaluator;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/ApplicationIdentifier.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/ApplicationIdentifier.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/ApplicationIdentifier.java
new file mode 100644
index 0000000..6d405f8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/ApplicationIdentifier.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The RM application/job identifier.
+ * <p/>
+ * In YARN, this is the applicationID assigned by the resource manager.
+ */
+@NamedParameter(doc = "The RM application/job identifier.")
+public final class ApplicationIdentifier implements Name<String> {
+ private ApplicationIdentifier() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/DriverRemoteIdentifier.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/DriverRemoteIdentifier.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/DriverRemoteIdentifier.java
new file mode 100644
index 0000000..b9e803e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/DriverRemoteIdentifier.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The identifier used by the Evaluator to connect back to the Driver.
+ */
+@NamedParameter(doc = "The identifier used by the Evaluator to connect back to the Driver.")
+public final class DriverRemoteIdentifier implements Name<String> {
+ private DriverRemoteIdentifier() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/EvaluatorIdentifier.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/EvaluatorIdentifier.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/EvaluatorIdentifier.java
new file mode 100644
index 0000000..d0fa894
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/EvaluatorIdentifier.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The evaluator identifier.
+ */
+@NamedParameter(doc = "The evaluator identifier.")
+public final class EvaluatorIdentifier implements Name<String> {
+ private EvaluatorIdentifier() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/HeartbeatPeriod.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/HeartbeatPeriod.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/HeartbeatPeriod.java
new file mode 100644
index 0000000..20804c6
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/HeartbeatPeriod.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The evaluator heartbeat period in ms.
+ */
+@NamedParameter(doc = "The evaluator heartbeat period in ms.", default_value = "5000")
+public final class HeartbeatPeriod implements Name<Integer> {
+ private HeartbeatPeriod() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/InitialTaskConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/InitialTaskConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/InitialTaskConfiguration.java
new file mode 100644
index 0000000..b9d80cd
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/InitialTaskConfiguration.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * An initial task to launch on startup.
+ */
+@NamedParameter(doc = "An initial task to launch on startup.")
+public final class InitialTaskConfiguration implements Name<String> {
+ private InitialTaskConfiguration() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/RootContextConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/RootContextConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/RootContextConfiguration.java
new file mode 100644
index 0000000..d4bc566
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/RootContextConfiguration.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The evaluator root context configuration.
+ */
+@NamedParameter(doc = "The evaluator root context configuration.")
+public final class RootContextConfiguration implements Name<String> {
+ private RootContextConfiguration() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/RootServiceConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/RootServiceConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/RootServiceConfiguration.java
new file mode 100644
index 0000000..aabc326
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/RootServiceConfiguration.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator.parameters;
+
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+
+/**
+ * The service configuration for the root context
+ */
+@NamedParameter(doc = "The service configuration for the root context")
+public final class RootServiceConfiguration implements Name<String> {
+ private RootServiceConfiguration() {
+ }
+}