You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2014/10/23 02:04:00 UTC
[29/51] [abbrv] [partial] Initial merge of Wake,
Tang and REEF into one repository and project
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java
new file mode 100644
index 0000000..1d88147
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/catalog/ResourceCatalogImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.catalog;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.catalog.RackDescriptor;
+import org.apache.reef.driver.catalog.ResourceCatalog;
+import org.apache.reef.proto.DriverRuntimeProtocol.NodeDescriptorProto;
+
+import javax.inject.Inject;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Private
+public final class ResourceCatalogImpl implements ResourceCatalog {
+
+ public final static String DEFAULT_RACK = "/default-rack";
+ private static final Logger LOG = Logger.getLogger(ResourceCatalog.class.getName());
+ private final Map<String, RackDescriptorImpl> racks = new HashMap<>();
+
+ private final Map<String, NodeDescriptorImpl> nodes = new HashMap<>();
+
+ @Inject
+ ResourceCatalogImpl() {
+ LOG.log(Level.FINE, "Instantiated 'ResourceCatalogImpl'");
+ }
+
+ @Override
+ public synchronized String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("=== Resource Catalog ===");
+ for (final RackDescriptor rack : racks.values()) {
+ sb.append("\n" + rack);
+ }
+ return sb.toString();
+ }
+
+ @Override
+ public synchronized Collection<NodeDescriptor> getNodes() {
+ return Collections.unmodifiableCollection(new ArrayList<NodeDescriptor>(this.nodes.values()));
+ }
+
+ @Override
+ public synchronized Collection<RackDescriptor> getRacks() {
+ return Collections.unmodifiableCollection(new ArrayList<RackDescriptor>(this.racks.values()));
+ }
+
+ public synchronized final NodeDescriptor getNode(final String id) {
+ return this.nodes.get(id);
+ }
+
+ public synchronized final void handle(final NodeDescriptorProto node) {
+ final String rack_name = (node.hasRackName() ? node.getRackName() : DEFAULT_RACK);
+
+ LOG.log(Level.FINEST, "Catalog new node: id[{0}], rack[{1}], host[{2}], port[{3}], memory[{4}]",
+ new Object[]{node.getIdentifier(), rack_name, node.getHostName(), node.getPort(),
+ node.getMemorySize()}
+ );
+
+ if (!this.racks.containsKey(rack_name)) {
+ final RackDescriptorImpl rack = new RackDescriptorImpl(rack_name);
+ this.racks.put(rack_name, rack);
+ }
+ final RackDescriptorImpl rack = this.racks.get(rack_name);
+ final InetSocketAddress address = new InetSocketAddress(node.getHostName(), node.getPort());
+ final NodeDescriptorImpl nodeDescriptor = new NodeDescriptorImpl(node.getIdentifier(), address, rack, node.getMemorySize());
+ this.nodes.put(nodeDescriptor.getId(), nodeDescriptor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
new file mode 100644
index 0000000..42dd3e8
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientConnection.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.client;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Represents the communication channel to the client.
+ */
+@DriverSide
+public final class ClientConnection {
+
+ private static final Logger LOG = Logger.getLogger(ClientConnection.class.getName());
+
+ private final EventHandler<ReefServiceProtos.JobStatusProto> jobStatusHandler;
+ private final String jobIdentifier;
+
+ @Inject
+ public ClientConnection(
+ final RemoteManager remoteManager,
+ final @Parameter(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.class) String clientRID,
+ final @Parameter(AbstractDriverRuntimeConfiguration.JobIdentifier.class) String jobIdentifier) {
+ this.jobIdentifier = jobIdentifier;
+ if (clientRID.equals(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.NONE)) {
+ LOG.log(Level.FINE, "Instantiated 'ClientConnection' without an actual connection to the client.");
+ this.jobStatusHandler = new LoggingJobStatusHandler();
+ } else {
+ this.jobStatusHandler = remoteManager.getHandler(clientRID, ReefServiceProtos.JobStatusProto.class);
+ LOG.log(Level.FINE, "Instantiated 'ClientConnection'");
+ }
+ }
+
+ /**
+ * Send the given JobStatus to the client.
+ *
+ * @param status
+ */
+ public synchronized void send(final ReefServiceProtos.JobStatusProto status) {
+ LOG.log(Level.FINEST, "Sending:\n" + status);
+ this.jobStatusHandler.onNext(status);
+ }
+
+ /**
+ * Send the given byte[] as a message to the client.
+ *
+ * @param message
+ */
+ public synchronized void sendMessage(final byte[] message) {
+ this.send(ReefServiceProtos.JobStatusProto.newBuilder()
+ .setIdentifier(this.jobIdentifier)
+ .setState(ReefServiceProtos.State.RUNNING)
+ .setMessage(ByteString.copyFrom(message))
+ .build());
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientManager.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientManager.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientManager.java
new file mode 100644
index 0000000..9b40f09
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/ClientManager.java
@@ -0,0 +1,145 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.client;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.parameters.ClientCloseHandlers;
+import org.apache.reef.driver.parameters.ClientCloseWithMessageHandlers;
+import org.apache.reef.driver.parameters.ClientMessageHandlers;
+import org.apache.reef.proto.ClientRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration;
+import org.apache.reef.runtime.common.utils.BroadCastEventHandler;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Represents the Client in the Driver.
+ */
+@Private
+@DriverSide
+public final class ClientManager implements EventHandler<ClientRuntimeProtocol.JobControlProto> {
+
+ private final static Logger LOG = Logger.getLogger(ClientManager.class.getName());
+
+
+ private final InjectionFuture<Set<EventHandler<Void>>> clientCloseHandlers;
+
+ private final InjectionFuture<Set<EventHandler<byte[]>>> clientCloseWithMessageHandlers;
+
+ private final InjectionFuture<Set<EventHandler<byte[]>>> clientMessageHandlers;
+
+ private final DriverStatusManager driverStatusManager;
+
+ private volatile EventHandler<Void> clientCloseDispatcher;
+
+ private volatile EventHandler<byte[]> clientCloseWithMessageDispatcher;
+
+ private volatile EventHandler<byte[]> clientMessageDispatcher;
+
+
+ @Inject
+ ClientManager(final @Parameter(ClientCloseHandlers.class) InjectionFuture<Set<EventHandler<Void>>> clientCloseHandlers,
+ final @Parameter(ClientCloseWithMessageHandlers.class) InjectionFuture<Set<EventHandler<byte[]>>> clientCloseWithMessageHandlers,
+ final @Parameter(ClientMessageHandlers.class) InjectionFuture<Set<EventHandler<byte[]>>> clientMessageHandlers,
+ final @Parameter(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.class) String clientRID,
+ final RemoteManager remoteManager,
+ final DriverStatusManager driverStatusManager) {
+ this.driverStatusManager = driverStatusManager;
+ this.clientCloseHandlers = clientCloseHandlers;
+ this.clientCloseWithMessageHandlers = clientCloseWithMessageHandlers;
+ this.clientMessageHandlers = clientMessageHandlers;
+
+ if (!clientRID.equals(AbstractDriverRuntimeConfiguration.ClientRemoteIdentifier.NONE)) {
+ remoteManager.registerHandler(clientRID, ClientRuntimeProtocol.JobControlProto.class, this);
+ } else {
+ LOG.log(Level.FINE, "Not registering a handler for JobControlProto, as there is no client.");
+ }
+ }
+
+ /**
+ * This method reacts to control messages passed by the client to the driver. It will forward
+ * messages related to the ClientObserver interface to the Driver. It will also initiate a shutdown
+ * if the client indicates a close message.
+ *
+ * @param jobControlProto contains the client initiated control message
+ */
+ @Override
+ public synchronized void onNext(final ClientRuntimeProtocol.JobControlProto jobControlProto) {
+ if (jobControlProto.hasSignal()) {
+ if (jobControlProto.getSignal() == ClientRuntimeProtocol.Signal.SIG_TERMINATE) {
+ try {
+ if (jobControlProto.hasMessage()) {
+ getClientCloseWithMessageDispatcher().onNext(jobControlProto.getMessage().toByteArray());
+ } else {
+ getClientCloseDispatcher().onNext(null);
+ }
+ } finally {
+ this.driverStatusManager.onComplete();
+ }
+ } else {
+ LOG.log(Level.FINEST, "Unsupported signal: " + jobControlProto.getSignal());
+ }
+ } else if (jobControlProto.hasMessage()) {
+ getClientMessageDispatcher().onNext(jobControlProto.getMessage().toByteArray());
+ }
+ }
+
+ private synchronized EventHandler<Void> getClientCloseDispatcher() {
+ if (clientCloseDispatcher != null) {
+ return clientCloseDispatcher;
+ } else {
+ synchronized (this) {
+ if (clientCloseDispatcher == null)
+ clientCloseDispatcher = new BroadCastEventHandler<>(clientCloseHandlers.get());
+ }
+ return clientCloseDispatcher;
+ }
+ }
+
+ private EventHandler<byte[]> getClientCloseWithMessageDispatcher() {
+ if (clientCloseWithMessageDispatcher != null) {
+ return clientCloseWithMessageDispatcher;
+ } else {
+ synchronized (this) {
+ if (clientCloseWithMessageDispatcher == null)
+ clientCloseWithMessageDispatcher = new BroadCastEventHandler<>(clientCloseWithMessageHandlers.get());
+ }
+ return clientCloseWithMessageDispatcher;
+ }
+ }
+
+ private EventHandler<byte[]> getClientMessageDispatcher() {
+ if (clientMessageDispatcher != null) {
+ return clientMessageDispatcher;
+ } else {
+ synchronized (this) {
+ if (clientMessageDispatcher == null)
+ clientMessageDispatcher = new BroadCastEventHandler<>(clientMessageHandlers.get());
+ }
+ return clientMessageDispatcher;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobMessageObserverImpl.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobMessageObserverImpl.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobMessageObserverImpl.java
new file mode 100644
index 0000000..31f6f39
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/JobMessageObserverImpl.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.client;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.driver.client.JobMessageObserver;
+
+import javax.inject.Inject;
+
+/**
+ * An implementation of JobMessageObserver.
+ */
+@DriverSide
+public final class JobMessageObserverImpl implements JobMessageObserver {
+
+ private final ClientConnection clientConnection;
+
+ @Inject
+ public JobMessageObserverImpl(final ClientConnection clientConnection) {
+ this.clientConnection = clientConnection;
+ }
+
+ @Override
+ public void sendMessageToClient(final byte[] message) {
+ this.clientConnection.sendMessage(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java
new file mode 100644
index 0000000..028b8cd
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/LoggingJobStatusHandler.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.client;
+
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A handler for job status messages that just logs them.
+ */
+final class LoggingJobStatusHandler implements EventHandler<ReefServiceProtos.JobStatusProto> {
+ private static final Logger LOG = Logger.getLogger(LoggingJobStatusHandler.class.getName());
+
+ @Inject
+ LoggingJobStatusHandler() {
+ }
+
+ @Override
+ public void onNext(final ReefServiceProtos.JobStatusProto jobStatusProto) {
+ LOG.log(Level.INFO, "Received a JobStatus message that can't be sent:\n" + jobStatusProto.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/package-info.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/package-info.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/package-info.java
new file mode 100644
index 0000000..c6b0d6f
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/client/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Objects representing the Client on the Driver.
+ */
+
+@DriverSide package org.apache.reef.runtime.common.driver.client;
+
+import org.apache.reef.annotations.audience.DriverSide;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ClosedContextImpl.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ClosedContextImpl.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ClosedContextImpl.java
new file mode 100644
index 0000000..6464558
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ClosedContextImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.context;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.util.Optional;
+
+/**
+ * Driver side representation of a closed context.
+ */
+@DriverSide
+@Private
+public final class ClosedContextImpl implements ClosedContext {
+
+
+ private final ActiveContext parentContext;
+ private final String contextID;
+ private final String evaluatorId;
+ private final EvaluatorDescriptor evaluatorDescriptor;
+
+ /**
+ * @param parentContext the parent context.
+ * @param contextID the id of the closed context
+ * @param evaluatorId the id of the evaluator on which the context was closed
+ * @param evaluatorDescriptor the descriptor of the evaluator on which the context was closed.
+ */
+ public ClosedContextImpl(final ActiveContext parentContext,
+ final String contextID,
+ final String evaluatorId,
+ final EvaluatorDescriptor evaluatorDescriptor) {
+ this.parentContext = parentContext;
+ this.contextID = contextID;
+ this.evaluatorId = evaluatorId;
+ this.evaluatorDescriptor = evaluatorDescriptor;
+ }
+
+ @Override
+ public ActiveContext getParentContext() {
+ return parentContext;
+ }
+
+ @Override
+ public String getId() {
+ return this.contextID;
+ }
+
+ @Override
+ public String getEvaluatorId() {
+ return this.evaluatorId;
+ }
+
+ @Override
+ public Optional<String> getParentId() {
+ return Optional.of(this.parentContext.getId());
+ }
+
+ @Override
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return this.evaluatorDescriptor;
+ }
+
+ @Override
+ public String toString() {
+ return "ClosedContext{" +
+ "contextID='" + contextID + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextControlHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextControlHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextControlHandler.java
new file mode 100644
index 0000000..0c5ba15
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextControlHandler.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.context;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorControlHandler;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Handles context control messages.
+ */
+@DriverSide
+@Private
+public class ContextControlHandler {
+ private static final Logger LOG = Logger.getLogger(ContextControlHandler.class.getName());
+ private final EvaluatorControlHandler evaluatorControlHandler;
+ private final String evaluatorId;
+
+ /**
+ * @param evaluatorControlHandler used to send the actual evaluator control messages.
+ * @param evaluatorId the ID of the evaluator this ContextControlHandler communicates with.
+ */
+ @Inject
+ ContextControlHandler(final EvaluatorControlHandler evaluatorControlHandler,
+ final @Parameter(EvaluatorManager.EvaluatorIdentifier.class) String evaluatorId) {
+ this.evaluatorControlHandler = evaluatorControlHandler;
+ this.evaluatorId = evaluatorId;
+ LOG.log(Level.FINE, "Instantiated 'ContextControlHandler'");
+ }
+
+ public synchronized void send(final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto) {
+ final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto =
+ EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder()
+ .setTimestamp(System.currentTimeMillis())
+ .setIdentifier(evaluatorId)
+ .setContextControl(contextControlProto).build();
+ this.evaluatorControlHandler.send(evaluatorControlProto);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextFactory.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextFactory.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextFactory.java
new file mode 100644
index 0000000..1fb027a
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextFactory.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.context;
+
+import net.jcip.annotations.GuardedBy;
+import net.jcip.annotations.ThreadSafe;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Helper class to generate EvaluatorContext instances. Used in ContextRepresenters.
+ */
+@ThreadSafe
+final class ContextFactory {
+
+ private final String evaluatorId;
+ private final EvaluatorDescriptor evaluatorDescriptor;
+ private final ConfigurationSerializer configurationSerializer;
+ private final ExceptionCodec exceptionCodec;
+ private final EvaluatorMessageDispatcher messageDispatcher;
+ private final ContextControlHandler contextControlHandler;
+ private final InjectionFuture<ContextRepresenters> contextRepresenters;
+
+
+ @GuardedBy("this.priorIds")
+ private final Set<String> priorIds = new HashSet<>();
+
+
+ @Inject
+ ContextFactory(final @Parameter(EvaluatorManager.EvaluatorIdentifier.class) String evaluatorId,
+ final @Parameter(EvaluatorManager.EvaluatorDescriptorName.class) EvaluatorDescriptor evaluatorDescriptor,
+ final ConfigurationSerializer configurationSerializer,
+ final ExceptionCodec exceptionCodec,
+ final EvaluatorMessageDispatcher messageDispatcher,
+ final ContextControlHandler contextControlHandler,
+ final InjectionFuture<ContextRepresenters> contextRepresenters) {
+ this.evaluatorId = evaluatorId;
+ this.evaluatorDescriptor = evaluatorDescriptor;
+ this.configurationSerializer = configurationSerializer;
+ this.exceptionCodec = exceptionCodec;
+ this.messageDispatcher = messageDispatcher;
+ this.contextControlHandler = contextControlHandler;
+ this.contextRepresenters = contextRepresenters;
+ }
+
+ /**
+ * Instantiate a new Context representer with the given id and parent id.
+ *
+ * @param contextId
+ * @param parentID
+ * @return a new Context representer with the given id and parent id.
+ */
+ public final EvaluatorContext newContext(final String contextId, final Optional<String> parentID) {
+ synchronized (this.priorIds) {
+ if (this.priorIds.contains(contextId)) {
+ throw new IllegalStateException("Creating second EvaluatorContext instance for id " + contextId);
+ }
+ this.priorIds.add(contextId);
+ }
+ return new EvaluatorContext(contextId,
+ this.evaluatorId,
+ this.evaluatorDescriptor,
+ parentID,
+ this.configurationSerializer,
+ this.contextControlHandler,
+ this.messageDispatcher,
+ this.exceptionCodec,
+ this.contextRepresenters.get());
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java
new file mode 100644
index 0000000..37c7266
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextMessageImpl.java
@@ -0,0 +1,52 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.context;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ContextMessage;
+
+/**
+ * Driver-side representation of a context message.
+ */
+@Private
+@DriverSide
+public final class ContextMessageImpl implements ContextMessage {
+ private final byte[] theMessage;
+ private final String theContextID;
+ private final String theMessageSourceId;
+
+ public ContextMessageImpl(final byte[] theMessage, final String theContextID, final String theMessageSourceId) {
+ this.theMessage = theMessage;
+ this.theContextID = theContextID;
+ this.theMessageSourceId = theMessageSourceId;
+ }
+
+ @Override
+ public byte[] get() {
+ return this.theMessage;
+ }
+
+ @Override
+ public String getId() {
+ return this.theContextID;
+ }
+
+ @Override
+ public final String getMessageSourceID() {
+ return this.theMessageSourceId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
new file mode 100644
index 0000000..0ca7ff1
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/ContextRepresenters.java
@@ -0,0 +1,244 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.context;
+
+import net.jcip.annotations.GuardedBy;
+import net.jcip.annotations.ThreadSafe;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
+import org.apache.reef.util.Optional;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver-Side representation of all contexts on an Evaluator.
+ */
+@ThreadSafe
+@DriverSide
+@Private
+public final class ContextRepresenters {
+ private static final Logger LOG = Logger.getLogger(ContextRepresenters.class.getName());
+
+ private final EvaluatorMessageDispatcher messageDispatcher;
+ private final ContextFactory contextFactory;
+
+ // Mutable fields
+ @GuardedBy("this")
+ private final List<EvaluatorContext> contextStack = new ArrayList<>();
+ @GuardedBy("this")
+ private final Set<String> contextIds = new HashSet<>();
+
+ @Inject
+ private ContextRepresenters(final EvaluatorMessageDispatcher messageDispatcher,
+ final ContextFactory contextFactory) {
+ this.messageDispatcher = messageDispatcher;
+ this.contextFactory = contextFactory;
+ }
+
+ /**
+ * Fetch the context with the given ID.
+ *
+ * @param contextId
+ * @return
+ */
+ public synchronized EvaluatorContext getContext(final String contextId) {
+ for (final EvaluatorContext context : this.contextStack) {
+ if (context.getId().equals(contextId)) return context;
+ }
+ throw new RuntimeException("Unknown evaluator context " + contextId);
+ }
+
+ /**
+ * Create the failed contexts for a FailedEvaluator event.
+ *
+ * @return
+ */
+ public synchronized List<FailedContext> getFailedContextsForEvaluatorFailure() {
+ final List<FailedContext> failedContextList = new ArrayList<>();
+ final List<EvaluatorContext> activeContexts = new ArrayList<>(this.contextStack);
+ Collections.reverse(activeContexts);
+
+ for (final EvaluatorContext context : activeContexts) {
+ failedContextList.add(context.getFailedContextForEvaluatorFailure());
+ }
+ return failedContextList;
+ }
+
+ /**
+ * Process heartbeats from the contexts on an Evaluator.
+ *
+ * @param contextStatusProto
+ * @param notifyClientOnNewActiveContext
+ */
+ public synchronized void onContextStatusMessages(final Iterable<ReefServiceProtos.ContextStatusProto> contextStatusProtos,
+ final boolean notifyClientOnNewActiveContext) {
+ for (final ReefServiceProtos.ContextStatusProto contextStatusProto : contextStatusProtos) {
+ this.onContextStatusMessage(contextStatusProto, notifyClientOnNewActiveContext);
+ }
+ }
+
+
+ /**
+ * Process a heartbeat from a context
+ *
+ * @param contextStatusProto
+ * @param notifyClientOnNewActiveContext
+ */
+ private synchronized void onContextStatusMessage(final ReefServiceProtos.ContextStatusProto contextStatusProto,
+ final boolean notifyClientOnNewActiveContext) {
+
+ LOG.log(Level.FINER, "Processing context status message for context {0}", contextStatusProto.getContextId());
+ switch (contextStatusProto.getContextState()) {
+ case READY:
+ this.onContextReady(contextStatusProto, notifyClientOnNewActiveContext);
+ break;
+ case FAIL:
+ this.onContextFailed(contextStatusProto);
+ break;
+ case DONE:
+ this.onContextDone(contextStatusProto);
+ break;
+ default:
+ this.onUnknownContextStatus(contextStatusProto);
+ break;
+ }
+ LOG.log(Level.FINER, "Done processing context status message for context {0}", contextStatusProto.getContextId());
+
+ }
+
+
+ private synchronized void onUnknownContextStatus(final ReefServiceProtos.ContextStatusProto contextStatusProto) {
+ LOG.log(Level.WARNING, "Received unexpected context status: {0}", contextStatusProto);
+ throw new RuntimeException("Received unexpected context status: " + contextStatusProto.getContextState());
+ }
+
+ private synchronized void onContextFailed(final ReefServiceProtos.ContextStatusProto contextStatusProto) {
+ assert (ReefServiceProtos.ContextStatusProto.State.FAIL == contextStatusProto.getContextState());
+ final String contextID = contextStatusProto.getContextId();
+ LOG.log(Level.FINE, "Context {0} failed", contextID);
+ // It could have failed right away.
+ if (this.isUnknownContextId(contextID)) {
+ this.onNewContext(contextStatusProto, false);
+ }
+ final EvaluatorContext context = getContext(contextID);
+ this.removeContext(context);
+ this.messageDispatcher.onContextFailed(context.getFailedContext(contextStatusProto));
+ }
+
+ private synchronized void onContextDone(final ReefServiceProtos.ContextStatusProto contextStatusProto) {
+ assert (ReefServiceProtos.ContextStatusProto.State.DONE == contextStatusProto.getContextState());
+ final String contextID = contextStatusProto.getContextId();
+ if (isUnknownContextId(contextID)) {
+ throw new RuntimeException("Received DONE for context " + contextID + " which is unknown.");
+ } else {
+ LOG.log(Level.FINE, "Context {0} is DONE.", contextID);
+ final EvaluatorContext context = getContext(contextID);
+ removeContext(context);
+
+ if (context.isRootContext()) {
+ LOG.log(Level.FINE, "Root context {0} closed. Evaluator closed will trigger final shutdown.", contextID);
+ } else {
+ final EvaluatorContext parentContext = this.getContext(context.getParentId().get());
+ this.messageDispatcher.onContextClose(context.getClosedContext(parentContext));
+ }
+ }
+ }
+
+ /**
+ * Process a message with status READY from a context.
+ *
+ * @param contextStatusProto
+ * @param notifyClientOnNewActiveContext whether or not to inform the application when this in fact refers to a new
+ * context.
+ */
+ private synchronized void onContextReady(final ReefServiceProtos.ContextStatusProto contextStatusProto,
+ final boolean notifyClientOnNewActiveContext) {
+ assert (ReefServiceProtos.ContextStatusProto.State.READY == contextStatusProto.getContextState());
+ final String contextID = contextStatusProto.getContextId();
+ // This could be the first message we get from that context
+ if (this.isUnknownContextId(contextID)) {
+ this.onNewContext(contextStatusProto, notifyClientOnNewActiveContext);
+ }
+
+ // Dispatch the messages to the application, if there are any.
+ for (final ReefServiceProtos.ContextStatusProto.ContextMessageProto contextMessageProto : contextStatusProto.getContextMessageList()) {
+ final byte[] theMessage = contextMessageProto.getMessage().toByteArray();
+ final String sourceID = contextMessageProto.getSourceId();
+ this.messageDispatcher.onContextMessage(new ContextMessageImpl(theMessage, contextID, sourceID));
+ }
+
+ }
+
+ /**
+ * Create and add a new context representer.
+ *
+ * @param contextStatusProto the message to create the context from
+ * @param notifyClientOnNewActiveContext whether or not to fire an event to the user.
+ */
+ private synchronized void onNewContext(final ReefServiceProtos.ContextStatusProto contextStatusProto,
+ final boolean notifyClientOnNewActiveContext) {
+ final String contextID = contextStatusProto.getContextId();
+ LOG.log(Level.FINE, "Adding new context {0}.", contextID);
+
+ final Optional<String> parentID = contextStatusProto.hasParentId() ?
+ Optional.of(contextStatusProto.getParentId()) : Optional.<String>empty();
+ final EvaluatorContext context = contextFactory.newContext(contextID, parentID);
+ this.addContext(context);
+ if (contextStatusProto.getRecovery()) {
+ // when we get a recovered active context, always notify application
+ this.messageDispatcher.OnDriverRestartContextActive(context);
+ } else {
+ if (notifyClientOnNewActiveContext) {
+ this.messageDispatcher.onContextActive(context);
+ }
+ }
+ }
+
+ /**
+ * Add the given context to the data structures.
+ *
+ * @param context
+ */
+ private synchronized void addContext(final EvaluatorContext context) {
+ this.contextStack.add(context);
+ this.contextIds.add(context.getId());
+ }
+
+ /**
+ * Remove the given context from the data structures.
+ *
+ * @param context
+ */
+ private synchronized void removeContext(final EvaluatorContext context) {
+ this.contextStack.remove(context);
+ this.contextIds.remove(context.getId());
+ }
+
+ /**
+ * @param contextId
+ * @return true if the given context id is unknown so far.
+ */
+ private synchronized boolean isUnknownContextId(final String contextId) {
+ return !this.contextIds.contains(contextId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
new file mode 100644
index 0000000..e033116
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
@@ -0,0 +1,277 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.context;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.Optional;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver-side representation of a Context on an Evaluator.
+ */
+@DriverSide
+@Private
+public final class EvaluatorContext implements ActiveContext {
+
+ private final static Logger LOG = Logger.getLogger(EvaluatorContext.class.getName());
+
+ private final String contextIdentifier;
+ private final String evaluatorIdentifier;
+ private final EvaluatorDescriptor evaluatorDescriptor;
+
+ private final Optional<String> parentID;
+ private final ConfigurationSerializer configurationSerializer;
+ private final ContextControlHandler contextControlHandler;
+ private final ExceptionCodec exceptionCodec;
+ private final ContextRepresenters contextRepresenters;
+
+ private boolean isClosed = false;
+
+ public EvaluatorContext(final String contextIdentifier,
+ final String evaluatorIdentifier,
+ final EvaluatorDescriptor evaluatorDescriptor,
+ final Optional<String> parentID,
+ final ConfigurationSerializer configurationSerializer,
+ final ContextControlHandler contextControlHandler,
+ final EvaluatorMessageDispatcher messageDispatcher,
+ final ExceptionCodec exceptionCodec,
+ final ContextRepresenters contextRepresenters) {
+
+ this.contextIdentifier = contextIdentifier;
+ this.evaluatorIdentifier = evaluatorIdentifier;
+ this.evaluatorDescriptor = evaluatorDescriptor;
+ this.parentID = parentID;
+ this.configurationSerializer = configurationSerializer;
+ this.contextControlHandler = contextControlHandler;
+ this.exceptionCodec = exceptionCodec;
+ this.contextRepresenters = contextRepresenters;
+
+ LOG.log(Level.FINE, "Instantiated 'EvaluatorContext'");
+ }
+
+ @Override
+ public synchronized void close() {
+
+ if (this.isClosed) {
+ throw new RuntimeException("Active context already closed");
+ }
+
+ LOG.log(Level.FINEST, "Submit close context: RunningEvaluator id[{0}] for context id[{1}]",
+ new Object[]{getEvaluatorId(), getId()});
+
+ final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
+ EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
+ .setRemoveContext(
+ EvaluatorRuntimeProtocol.RemoveContextProto.newBuilder()
+ .setContextId(getId())
+ .build())
+ .build();
+
+ this.contextControlHandler.send(contextControlProto);
+ this.isClosed = true;
+ }
+
+ @Override
+ public synchronized void sendMessage(final byte[] message) {
+
+ if (this.isClosed) {
+ throw new RuntimeException("Active context already closed");
+ }
+
+ LOG.log(Level.FINEST, "Send message: RunningEvaluator id[{0}] for context id[{1}]",
+ new Object[]{getEvaluatorId(), getId()});
+
+ final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
+ EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
+ .setContextMessage(EvaluatorRuntimeProtocol.ContextMessageProto.newBuilder()
+ .setContextId(this.contextIdentifier)
+ .setMessage(ByteString.copyFrom(message))
+ .build())
+ .build();
+
+ this.contextControlHandler.send(contextControlProto);
+ }
+
+ @Override
+ public synchronized void submitTask(final Configuration taskConf) {
+
+ if (this.isClosed) {
+ throw new RuntimeException("Active context already closed");
+ }
+
+ LOG.log(Level.FINEST, "Submit task: RunningEvaluator id[{0}] for context id[{1}]",
+ new Object[]{getEvaluatorId(), getId()});
+
+ final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
+ EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
+ .setStartTask(
+ EvaluatorRuntimeProtocol.StartTaskProto.newBuilder()
+ .setContextId(this.contextIdentifier)
+ .setConfiguration(this.configurationSerializer.toString(taskConf))
+ .build())
+ .build();
+
+ this.contextControlHandler.send(contextControlProto);
+ }
+
+ @Override
+ public synchronized void submitContext(final Configuration contextConfiguration) {
+
+ if (this.isClosed) {
+ throw new RuntimeException("Active context already closed");
+ }
+
+ LOG.log(Level.FINEST, "Submit new context: RunningEvaluator id[{0}] for context id[{1}]",
+ new Object[]{getEvaluatorId(), getId()});
+
+ final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
+ EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
+ .setAddContext(
+ EvaluatorRuntimeProtocol.AddContextProto.newBuilder()
+ .setParentContextId(getId())
+ .setContextConfiguration(this.configurationSerializer.toString(contextConfiguration))
+ .build())
+ .build();
+
+ this.contextControlHandler.send(contextControlProto);
+ }
+
+ @Override
+ public synchronized void submitContextAndService(
+ final Configuration contextConfiguration, final Configuration serviceConfiguration) {
+
+ if (this.isClosed) {
+ throw new RuntimeException("Active context already closed");
+ }
+
+ LOG.log(Level.FINEST, "Submit new context: RunningEvaluator id[{0}] for context id[{1}]",
+ new Object[]{getEvaluatorId(), getId()});
+
+ final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto =
+ EvaluatorRuntimeProtocol.ContextControlProto.newBuilder()
+ .setAddContext(
+ EvaluatorRuntimeProtocol.AddContextProto.newBuilder()
+ .setParentContextId(getId())
+ .setContextConfiguration(this.configurationSerializer.toString(contextConfiguration))
+ .setServiceConfiguration(this.configurationSerializer.toString(serviceConfiguration))
+ .build())
+ .build();
+
+ this.contextControlHandler.send(contextControlProto);
+ }
+
+ @Override
+ public String getEvaluatorId() {
+ return this.evaluatorIdentifier;
+ }
+
+ @Override
+ public Optional<String> getParentId() {
+ return this.parentID;
+ }
+
+ @Override
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return this.evaluatorDescriptor;
+ }
+
+ @Override
+ public String getId() {
+ return this.contextIdentifier;
+ }
+
+ @Override
+ public String toString() {
+ return "EvaluatorContext{" +
+ "contextIdentifier='" + this.contextIdentifier + '\'' +
+ ", evaluatorIdentifier='" + this.evaluatorIdentifier + '\'' +
+ ", parentID=" + this.parentID + '}';
+ }
+
+ public synchronized final ClosedContext getClosedContext(final ActiveContext parentContext) {
+ return new ClosedContextImpl(
+ parentContext, this.getId(), this.getEvaluatorId(), this.getEvaluatorDescriptor());
+ }
+
+ /**
+ * @return a FailedContext for the case of an EvaluatorFailure.
+ */
+ public synchronized FailedContext getFailedContextForEvaluatorFailure() {
+
+ final String id = this.getId();
+ final Optional<String> description = Optional.empty();
+ final Optional<byte[]> data = Optional.empty();
+ final Optional<Throwable> cause = Optional.empty();
+ final String message = "Evaluator Failure";
+
+ final Optional<ActiveContext> parentContext = getParentId().isPresent() ?
+ Optional.<ActiveContext>of(this.contextRepresenters.getContext(getParentId().get())) :
+ Optional.<ActiveContext>empty();
+
+ final EvaluatorDescriptor evaluatorDescriptor = getEvaluatorDescriptor();
+ final String evaluatorID = getEvaluatorId();
+
+ return new FailedContextImpl(
+ id, message, description, cause, data, parentContext, evaluatorDescriptor, evaluatorID);
+ }
+
+ public synchronized FailedContext getFailedContext(
+ final ReefServiceProtos.ContextStatusProto contextStatusProto) {
+
+ assert (ReefServiceProtos.ContextStatusProto.State.FAIL == contextStatusProto.getContextState());
+
+ final String id = this.getId();
+ final Optional<String> description = Optional.empty();
+
+ final Optional<byte[]> data = contextStatusProto.hasError() ?
+ Optional.of(contextStatusProto.getError().toByteArray()) :
+ Optional.<byte[]>empty();
+
+ final Optional<Throwable> cause = data.isPresent() ?
+ this.exceptionCodec.fromBytes(data) :
+ Optional.<Throwable>empty();
+
+ final String message = cause.isPresent() ? cause.get().getMessage() : "No message given";
+
+ final Optional<ActiveContext> parentContext = getParentId().isPresent() ?
+ Optional.<ActiveContext>of(this.contextRepresenters.getContext(getParentId().get())) :
+ Optional.<ActiveContext>empty();
+
+ final EvaluatorDescriptor evaluatorDescriptor = getEvaluatorDescriptor();
+ final String evaluatorID = getEvaluatorId();
+
+ return new FailedContextImpl(
+ id, message, description, cause, data, parentContext, evaluatorDescriptor, evaluatorID);
+ }
+
+ public synchronized boolean isRootContext() {
+ return !this.parentID.isPresent();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/FailedContextImpl.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/FailedContextImpl.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/FailedContextImpl.java
new file mode 100644
index 0000000..98e3db4
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/FailedContextImpl.java
@@ -0,0 +1,92 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.context;
+
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.common.AbstractFailure;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.util.Optional;
+
+/**
+ * Driver-Side representation of a failed context.
+ */
+@Private
+@DriverSide
+public final class FailedContextImpl extends AbstractFailure implements FailedContext {
+
+ private final Optional<ActiveContext> parentContext;
+ private final EvaluatorDescriptor evaluatorDescriptor;
+ private final String evaluatorID;
+
+ /**
+ * @param id Identifier of the entity that produced the error.
+ * @param message One-line error message.
+ * @param description Long error description.
+ * @param cause Java Exception that caused the error.
+ * @param data byte array that contains serialized version of the error.
+ * @param parentContext the parent context, if there is one.
+ * @param evaluatorDescriptor the descriptor of the Evaluator this context failed on.
+ * @param evaluatorID the id of the Evaluator this context failed on.
+ */
+ public FailedContextImpl(final String id,
+ final String message,
+ final Optional<String> description,
+ final Optional<Throwable> cause,
+ final Optional<byte[]> data,
+ final Optional<ActiveContext> parentContext,
+ final EvaluatorDescriptor evaluatorDescriptor,
+ final String evaluatorID) {
+ super(id, message, description, cause, data);
+ this.parentContext = parentContext;
+ this.evaluatorDescriptor = evaluatorDescriptor;
+ this.evaluatorID = evaluatorID;
+ }
+
+
+ @Override
+ public Optional<ActiveContext> getParentContext() {
+ return this.parentContext;
+ }
+
+ @Override
+ public String getEvaluatorId() {
+ return this.evaluatorID;
+ }
+
+ @Override
+ public Optional<String> getParentId() {
+ if (this.getParentContext().isPresent()) {
+ return Optional.of(this.getParentContext().get().getId());
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return this.evaluatorDescriptor;
+ }
+
+
+ @Override
+ public String toString() {
+ return "FailedContext{" + "evaluatorID='" + evaluatorID + "', contextID='" + getId() + "'}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/package-info.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/package-info.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/package-info.java
new file mode 100644
index 0000000..24704b4
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Implementations of Driver-Side representations of Contexts running on an Evaluator
+ */
+@DriverSide
+@Private package org.apache.reef.runtime.common.driver.context;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseHandler.java
new file mode 100644
index 0000000..76733e6
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseHandler.java
@@ -0,0 +1,43 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.util.ThreadLogger;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default handler for close messages from the client: Throw an Exception.
+ */
+public final class DefaultClientCloseHandler implements EventHandler<Void> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultClientCloseHandler.class.getName());
+
+ @Inject
+ DefaultClientCloseHandler() {
+ }
+
+ @Override
+ public void onNext(final Void aVoid) {
+ final String message = ThreadLogger.getFormattedThreadList(
+ "Received a close message from the client, but no handler was bound for it. Active threads: ");
+ LOG.log(Level.WARNING, message);
+ throw new RuntimeException(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseWithMessageHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseWithMessageHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseWithMessageHandler.java
new file mode 100644
index 0000000..c3d7c71
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientCloseWithMessageHandler.java
@@ -0,0 +1,36 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Default handler for close messages from the client: Throw an Exception.
+ */
+public final class DefaultClientCloseWithMessageHandler implements EventHandler<byte[]> {
+
+ @Inject
+ public DefaultClientCloseWithMessageHandler() {
+ }
+
+ @Override
+ public void onNext(final byte[] bytes) {
+ throw new RuntimeException(
+ "No handler bound for client Close With Message event: " + new String(bytes));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientMessageHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientMessageHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientMessageHandler.java
new file mode 100644
index 0000000..2cdf000
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultClientMessageHandler.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for Client messages: Logging it.
+ */
+public final class DefaultClientMessageHandler implements EventHandler<byte[]> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultClientMessageHandler.class.getName());
+
+ @Inject
+ public DefaultClientMessageHandler() {
+ }
+
+ @Override
+ public void onNext(final byte[] bytes) {
+ LOG.log(Level.INFO, "Received ClientMessage: {0}", new String(bytes));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextActiveHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextActiveHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextActiveHandler.java
new file mode 100644
index 0000000..002935c
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextActiveHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default handler for ActiveContext: Close it.
+ */
+public final class DefaultContextActiveHandler implements EventHandler<ActiveContext> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultContextActiveHandler.class.getName());
+
+ @Inject
+ private DefaultContextActiveHandler() {
+ }
+
+ @Override
+ public void onNext(final ActiveContext activeContext) {
+ LOG.log(Level.INFO, "Received ActiveContext: {0} :: CLOSING", activeContext);
+ activeContext.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextClosureHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextClosureHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextClosureHandler.java
new file mode 100644
index 0000000..7d23169
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextClosureHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for ClosedContext: Logging it.
+ */
+public final class DefaultContextClosureHandler implements EventHandler<ClosedContext> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultContextClosureHandler.class.getName());
+
+ @Inject
+ public DefaultContextClosureHandler() {
+ }
+
+ @Override
+ public void onNext(final ClosedContext closedContext) {
+ LOG.log(Level.INFO, "Received ClosedContext: {0}", closedContext);
+ if (closedContext.getParentContext() != null) {
+ LOG.log(Level.INFO, "Closing parent context: {0}", closedContext.getParentContext().getId());
+ closedContext.getParentContext().close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextFailureHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextFailureHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextFailureHandler.java
new file mode 100644
index 0000000..884a865
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextFailureHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Default event handler used for FailedContext: It crashes the driver.
+ */
+public final class DefaultContextFailureHandler implements EventHandler<FailedContext> {
+
+ @Inject
+ public DefaultContextFailureHandler() {
+ }
+
+ @Override
+ public void onNext(final FailedContext failedContext) {
+ if (failedContext.getReason().isPresent()) {
+ throw new RuntimeException(
+ "No handler bound for FailedContext:" + failedContext, failedContext.getReason().get());
+ } else {
+ throw new RuntimeException("No handler bound for FailedContext:" + failedContext);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextMessageHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextMessageHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextMessageHandler.java
new file mode 100644
index 0000000..7b5e4dd
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultContextMessageHandler.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for ContextMessage: Logging it.
+ */
+public final class DefaultContextMessageHandler implements EventHandler<ContextMessage> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultContextMessageHandler.class.getName());
+
+ @Inject
+ public DefaultContextMessageHandler() {
+ }
+
+ @Override
+ public void onNext(final ContextMessage contextMessage) {
+ LOG.log(Level.INFO, "Received ContextMessage: {0}", contextMessage);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartCompletedHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartCompletedHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartCompletedHandler.java
new file mode 100644
index 0000000..3825d99
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartCompletedHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.runtime.common.DriverRestartCompleted;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default handler for driver restart completed event: log it.
+ */
+public final class DefaultDriverRestartCompletedHandler implements EventHandler<DriverRestartCompleted> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultDriverRestartCompletedHandler.class.getName());
+
+ @Inject
+ private DefaultDriverRestartCompletedHandler() {
+ }
+
+ @Override
+ public void onNext(final DriverRestartCompleted restartCompleted) {
+ LOG.log(Level.INFO, "Driver restart completed at time [{0}].", restartCompleted.getTimeStamp());
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartContextActiveHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartContextActiveHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartContextActiveHandler.java
new file mode 100644
index 0000000..477ee01
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartContextActiveHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default handler for ActiveContext from previous evaluator during driver restart: Close it.
+ */
+public final class DefaultDriverRestartContextActiveHandler implements EventHandler<ActiveContext> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultDriverRestartContextActiveHandler.class.getName());
+
+ @Inject
+ private DefaultDriverRestartContextActiveHandler() {
+ }
+
+ @Override
+ public void onNext(final ActiveContext activeContext) {
+ LOG.log(Level.INFO, "Received ActiveContext running on previous Evaluator during driver restart: {0} :: CLOSING", activeContext);
+ activeContext.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartTaskRunningHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartTaskRunningHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartTaskRunningHandler.java
new file mode 100644
index 0000000..176a20a
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultDriverRestartTaskRunningHandler.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for Task Restart TaskRuntime: Logging it.
+ */
+public final class DefaultDriverRestartTaskRunningHandler implements EventHandler<RunningTask> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultDriverRestartTaskRunningHandler.class.getName());
+
+ @Inject
+ public DefaultDriverRestartTaskRunningHandler() {
+ }
+
+ @Override
+ public void onNext(final RunningTask runningTask) {
+ throw new RuntimeException(
+ "RunningTask [" + runningTask.toString() + "] received during driver restart, but no DriverRestartTaskRunningHandler is bound");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorAllocationHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorAllocationHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorAllocationHandler.java
new file mode 100644
index 0000000..8e5bb74
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorAllocationHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default handler for AllocatedEvaluator: close it.
+ */
+public final class DefaultEvaluatorAllocationHandler implements EventHandler<AllocatedEvaluator> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultContextMessageHandler.class.getName());
+
+ @Inject
+ public DefaultEvaluatorAllocationHandler() {
+ }
+
+ @Override
+ public void onNext(final AllocatedEvaluator allocatedEvaluator) {
+ LOG.log(Level.INFO, "Received AllocatedEvaluator: {0} :: CLOSING", allocatedEvaluator);
+ allocatedEvaluator.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorCompletionHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorCompletionHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorCompletionHandler.java
new file mode 100644
index 0000000..5a387c9
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorCompletionHandler.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for CompletedEvaluator: Logging it.
+ */
+public final class DefaultEvaluatorCompletionHandler implements EventHandler<CompletedEvaluator> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultContextMessageHandler.class.getName());
+
+ @Inject
+ public DefaultEvaluatorCompletionHandler() {
+ }
+
+ @Override
+ public void onNext(final CompletedEvaluator completedEvaluator) {
+ LOG.log(Level.INFO, "Received CompletedEvaluator: {0}", completedEvaluator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorFailureHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorFailureHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorFailureHandler.java
new file mode 100644
index 0000000..34ff7b1
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultEvaluatorFailureHandler.java
@@ -0,0 +1,38 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Default event handler used for FailedEvaluator: It crashes the driver.
+ */
+public final class DefaultEvaluatorFailureHandler implements EventHandler<FailedEvaluator> {
+
+ @Inject
+ public DefaultEvaluatorFailureHandler() {
+ }
+
+ @Override
+ public void onNext(final FailedEvaluator failedEvaluator) {
+ throw new RuntimeException(
+ "No handler bound for FailedEvaluator: " + failedEvaluator,
+ failedEvaluator.getEvaluatorException());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/50444bba/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultTaskCompletionHandler.java
----------------------------------------------------------------------
diff --git a/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultTaskCompletionHandler.java b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultTaskCompletionHandler.java
new file mode 100644
index 0000000..4dcc462
--- /dev/null
+++ b/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/DefaultTaskCompletionHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2014 Microsoft Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.CompletedTask;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default event handler for CompletedTask: Log it and close the context.
+ */
+public final class DefaultTaskCompletionHandler implements EventHandler<CompletedTask> {
+
+ private static final Logger LOG = Logger.getLogger(DefaultTaskCompletionHandler.class.getName());
+
+ @Inject
+ public DefaultTaskCompletionHandler() {
+ }
+
+ @Override
+ public void onNext(final CompletedTask completedTask) {
+ final ActiveContext context = completedTask.getActiveContext();
+ LOG.log(Level.INFO, "Received CompletedTask: {0} :: CLOSING context: {1}",
+ new Object[]{completedTask, context});
+ context.close();
+ }
+}