You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:11 UTC
[38/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/package-info.java
new file mode 100644
index 0000000..cc60934
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/defaults/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Default implementations for the optional driver-side event handlers.
+ */
+package org.apache.reef.runtime.common.driver.defaults;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java
new file mode 100644
index 0000000..462d6c9
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ContextConfiguration;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorType;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.evaluator.EvaluatorConfiguration;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.Optional;
+import org.apache.reef.util.logging.LoggingScope;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Driver-Side representation of an allocated evaluator.
+ */
+@DriverSide
+@Private
+final class AllocatedEvaluatorImpl implements AllocatedEvaluator {
+
+ private final static Logger LOG = Logger.getLogger(AllocatedEvaluatorImpl.class.getName());
+
+ private final EvaluatorManager evaluatorManager;
+ private final String remoteID;
+ private final ConfigurationSerializer configurationSerializer;
+ private final String jobIdentifier;
+ private final LoggingScopeFactory loggingScopeFactory;
+
+ /**
+ * The set of files to be places on the Evaluator.
+ */
+ private final Collection<File> files = new HashSet<>();
+ /**
+ * The set of libraries
+ */
+ private final Collection<File> libraries = new HashSet<>();
+
+ AllocatedEvaluatorImpl(final EvaluatorManager evaluatorManager,
+ final String remoteID,
+ final ConfigurationSerializer configurationSerializer,
+ final String jobIdentifier,
+ final LoggingScopeFactory loggingScopeFactory) {
+ this.evaluatorManager = evaluatorManager;
+ this.remoteID = remoteID;
+ this.configurationSerializer = configurationSerializer;
+ this.jobIdentifier = jobIdentifier;
+ this.loggingScopeFactory = loggingScopeFactory;
+ }
+
+ @Override
+ public String getId() {
+ return this.evaluatorManager.getId();
+ }
+
+ @Override
+ public void close() {
+ this.evaluatorManager.close();
+ }
+
+ @Override
+ public void submitTask(final Configuration taskConfiguration) {
+ final Configuration contextConfiguration = ContextConfiguration.CONF
+ .set(ContextConfiguration.IDENTIFIER, "RootContext_" + this.getId())
+ .build();
+ this.submitContextAndTask(contextConfiguration, taskConfiguration);
+
+ }
+
+ @Override
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return this.evaluatorManager.getEvaluatorDescriptor();
+ }
+
+
+ @Override
+ public void submitContext(final Configuration contextConfiguration) {
+ launch(contextConfiguration, Optional.<Configuration>empty(), Optional.<Configuration>empty());
+ }
+
+ @Override
+ public void submitContextAndService(final Configuration contextConfiguration,
+ final Configuration serviceConfiguration) {
+ launch(contextConfiguration, Optional.of(serviceConfiguration), Optional.<Configuration>empty());
+ }
+
+ @Override
+ public void submitContextAndTask(final Configuration contextConfiguration,
+ final Configuration taskConfiguration) {
+ launch(contextConfiguration, Optional.<Configuration>empty(), Optional.of(taskConfiguration));
+ }
+
+ @Override
+ public void submitContextAndServiceAndTask(final Configuration contextConfiguration,
+ final Configuration serviceConfiguration,
+ final Configuration taskConfiguration) {
+ launch(contextConfiguration, Optional.of(serviceConfiguration), Optional.of(taskConfiguration));
+ }
+
+ @Override
+ public void setType(final EvaluatorType type) {
+ this.evaluatorManager.setType(type);
+ }
+
+ @Override
+ public void addFile(final File file) {
+ this.files.add(file);
+ }
+
+ @Override
+ public void addLibrary(final File file) {
+ this.libraries.add(file);
+ }
+
+ private final void launch(final Configuration contextConfiguration,
+ final Optional<Configuration> serviceConfiguration,
+ final Optional<Configuration> taskConfiguration) {
+ try (final LoggingScope lb = loggingScopeFactory.evaluatorLaunch(this.getId())) {
+ try {
+ final ConfigurationModule evaluatorConfigurationModule = EvaluatorConfiguration.CONF
+ .set(EvaluatorConfiguration.APPLICATION_IDENTIFIER, this.jobIdentifier)
+ .set(EvaluatorConfiguration.DRIVER_REMOTE_IDENTIFIER, this.remoteID)
+ .set(EvaluatorConfiguration.EVALUATOR_IDENTIFIER, this.getId());
+
+ final String encodedContextConfigurationString = this.configurationSerializer.toString(contextConfiguration);
+ // Add the (optional) service configuration
+ final ConfigurationModule contextConfigurationModule;
+ if (serviceConfiguration.isPresent()) {
+ // With service configuration
+ final String encodedServiceConfigurationString = this.configurationSerializer.toString(serviceConfiguration.get());
+ contextConfigurationModule = evaluatorConfigurationModule
+ .set(EvaluatorConfiguration.ROOT_SERVICE_CONFIGURATION, encodedServiceConfigurationString)
+ .set(EvaluatorConfiguration.ROOT_CONTEXT_CONFIGURATION, encodedContextConfigurationString);
+ } else {
+ // No service configuration
+ contextConfigurationModule = evaluatorConfigurationModule
+ .set(EvaluatorConfiguration.ROOT_CONTEXT_CONFIGURATION, encodedContextConfigurationString);
+ }
+
+ // Add the (optional) task configuration
+ final Configuration evaluatorConfiguration;
+ if (taskConfiguration.isPresent()) {
+ final String encodedTaskConfigurationString = this.configurationSerializer.toString(taskConfiguration.get());
+ evaluatorConfiguration = contextConfigurationModule
+ .set(EvaluatorConfiguration.TASK_CONFIGURATION, encodedTaskConfigurationString).build();
+ } else {
+ evaluatorConfiguration = contextConfigurationModule.build();
+ }
+
+ final DriverRuntimeProtocol.ResourceLaunchProto.Builder rbuilder =
+ DriverRuntimeProtocol.ResourceLaunchProto.newBuilder()
+ .setIdentifier(this.evaluatorManager.getId())
+ .setRemoteId(this.remoteID)
+ .setEvaluatorConf(configurationSerializer.toString(evaluatorConfiguration));
+
+ for (final File file : this.files) {
+ rbuilder.addFile(ReefServiceProtos.FileResourceProto.newBuilder()
+ .setName(file.getName())
+ .setPath(file.getPath())
+ .setType(ReefServiceProtos.FileType.PLAIN)
+ .build());
+ }
+
+ for (final File lib : this.libraries) {
+ rbuilder.addFile(ReefServiceProtos.FileResourceProto.newBuilder()
+ .setName(lib.getName())
+ .setPath(lib.getPath().toString())
+ .setType(ReefServiceProtos.FileType.LIB)
+ .build());
+ }
+
+ { // Set the type
+ switch (this.evaluatorManager.getEvaluatorDescriptor().getType()) {
+ case CLR:
+ rbuilder.setType(ReefServiceProtos.ProcessType.CLR);
+ break;
+ default:
+ rbuilder.setType(ReefServiceProtos.ProcessType.JVM);
+ }
+ }
+
+ this.evaluatorManager.onResourceLaunch(rbuilder.build());
+
+ } catch (final BindException ex) {
+ LOG.log(Level.SEVERE, "Bad Evaluator configuration", ex);
+ throw new RuntimeException("Bad Evaluator configuration", ex);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "AllocatedEvaluator{ID='" + getId() + "\'}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/CompletedEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/CompletedEvaluatorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/CompletedEvaluatorImpl.java
new file mode 100644
index 0000000..78e4446
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/CompletedEvaluatorImpl.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+
+/**
+ * Implementation of CompletedEvaluator.
+ */
+@DriverSide
+@Private
+final class CompletedEvaluatorImpl implements CompletedEvaluator {
+
+ private final String id;
+
+ CompletedEvaluatorImpl(final String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getId() {
+ return this.id;
+ }
+
+ @Override
+ public String toString() {
+ return "CompletedEvaluator{" +
+ "id='" + id + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java
new file mode 100644
index 0000000..ce62027
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * This class handles the sending of Evaluator control messages to the Evaluator.
+ */
+@DriverSide
+@Private
+public final class EvaluatorControlHandler {
+
+ private static Logger LOG = Logger.getLogger(EvaluatorControlHandler.class.getName());
+ private final EvaluatorStatusManager stateManager;
+ private final RemoteManager remoteManager;
+ private final String evaluatorId;
+ private Optional<EventHandler<EvaluatorRuntimeProtocol.EvaluatorControlProto>> wrapped = Optional.empty();
+
+ /**
+ * @param stateManager used to check whether the Evaluator is running before sending a message.
+ * @param remoteManager used to establish the communications link as soon as the remote ID has been set.
+ */
+ @Inject
+ EvaluatorControlHandler(final EvaluatorStatusManager stateManager,
+ final RemoteManager remoteManager,
+ final @Parameter(EvaluatorManager.EvaluatorIdentifier.class) String evaluatorId) {
+ this.stateManager = stateManager;
+ this.remoteManager = remoteManager;
+ this.evaluatorId = evaluatorId;
+ LOG.log(Level.FINE, "Instantiated 'EvaluatorControlHandler'");
+ }
+
+ /**
+ * Send the evaluatorControlProto to the Evaluator.
+ *
+ * @param evaluatorControlProto
+ * @throws java.lang.IllegalStateException if the remote ID hasn't been set via setRemoteID() prior to this call
+ * @throws java.lang.IllegalStateException if the Evaluator isn't running.
+ */
+ public synchronized void send(final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) {
+ if (!this.wrapped.isPresent()) {
+ throw new IllegalStateException("Trying to send an EvaluatorControlProto before the Evaluator ID is set.");
+ }
+ if (!this.stateManager.isRunning()) {
+ final String msg = new StringBuilder()
+ .append("Trying to send an EvaluatorControlProto to Evaluator [")
+ .append(this.evaluatorId)
+ .append("] that is in state [")
+ .append(this.stateManager.toString())
+ .append("], not [RUNNING]. The control message was: ")
+ .append(evaluatorControlProto.toString())
+ .toString();
+ throw new IllegalStateException(msg);
+ }
+ this.wrapped.get().onNext(evaluatorControlProto);
+ }
+
+ /**
+ * Set the remote ID used to communicate with this Evaluator.
+ *
+ * @param evaluatorRID
+ * @throws java.lang.IllegalStateException if the remote ID has been set before.
+ */
+ synchronized void setRemoteID(final String evaluatorRID) {
+ if (this.wrapped.isPresent()) {
+ throw new IllegalStateException("Trying to reset the evaluator ID. This isn't supported.");
+ } else {
+ LOG.log(Level.FINE, "Registering remoteId [{0}] for Evaluator [{1}]", new Object[]{evaluatorRID, evaluatorId});
+ this.wrapped = Optional.of(remoteManager.getHandler(evaluatorRID, EvaluatorRuntimeProtocol.EvaluatorControlProto.class));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java
new file mode 100644
index 0000000..1a0bbcc
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorDescriptorImpl.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorType;
+
+/**
+ * A simple all-data implementation of EvaluatorDescriptor
+ */
+@Private
+@DriverSide
+final class EvaluatorDescriptorImpl implements EvaluatorDescriptor {
+
+ private final NodeDescriptor nodeDescriptor;
+ private final int megaBytes;
+ private final int numberOfCores;
+ private EvaluatorType type;
+
+ public EvaluatorDescriptorImpl(final NodeDescriptor nodeDescriptor,
+ final EvaluatorType type,
+ final int megaBytes,
+ final int numberOfCores) {
+ this.nodeDescriptor = nodeDescriptor;
+ this.type = type;
+ this.megaBytes = megaBytes;
+ this.numberOfCores = numberOfCores;
+ }
+
+ @Override
+ public NodeDescriptor getNodeDescriptor() {
+ return this.nodeDescriptor;
+ }
+
+ @Override
+ public synchronized EvaluatorType getType() {
+ return this.type;
+ }
+
+ public synchronized void setType(final EvaluatorType type) {
+ if (this.getType() != EvaluatorType.UNDECIDED) {
+ throw new RuntimeException("Unable to change state of an Evaluator of Type: " + this.getType());
+ }
+ this.type = type;
+ }
+
+ @Override
+ public int getMemory() {
+ return this.megaBytes;
+ }
+
+ @Override
+ public int getNumberOfCores() {
+ return this.numberOfCores;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartBeatSanityChecker.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartBeatSanityChecker.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartBeatSanityChecker.java
new file mode 100644
index 0000000..09d8088
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartBeatSanityChecker.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import javax.inject.Inject;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Sanity checks for evaluator heartbeats.
+ */
+@DriverSide
+@Private
+final class EvaluatorHeartBeatSanityChecker {
+ private static final Logger LOG = Logger.getLogger(EvaluatorHeartBeatSanityChecker.class.getName());
+ private final Map<String, Long> knownTimeStamps = new HashMap<>(); // guarded by this
+
+ @Inject
+ EvaluatorHeartBeatSanityChecker() {
+ }
+
+ final synchronized void check(final String id, final long timeStamp) {
+ if (knownTimeStamps.containsKey(id)) {
+ final long oldTimeStamp = this.knownTimeStamps.get(id);
+ LOG.log(Level.FINEST, "TIMESTAMP CHECKER: id [ " + id + " ], old timestamp [ " + oldTimeStamp + " ], new timestamp [ " + timeStamp + " ]");
+ if (oldTimeStamp > timeStamp) {
+ final String msg = "Received an old heartbeat with timestamp `" + timeStamp +
+ "` while earlier receiving one with timestamp `" + oldTimeStamp + "`";
+ LOG.log(Level.SEVERE, msg);
+ throw new RuntimeException(msg);
+ }
+ }
+ knownTimeStamps.put(id, timeStamp);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
new file mode 100644
index 0000000..c90098f
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorHeartbeatHandler.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Receives heartbeats from all Evaluators and dispatches them to the right EvaluatorManager instance.
+ */
+@Private
+@DriverSide
+public final class EvaluatorHeartbeatHandler implements EventHandler<RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto>> {
+ private static final Logger LOG = Logger.getLogger(EvaluatorHeartbeatHandler.class.getName());
+ private final Evaluators evaluators;
+ private final EvaluatorManagerFactory evaluatorManagerFactory;
+
+ @Inject
+ EvaluatorHeartbeatHandler(final Evaluators evaluators, final EvaluatorManagerFactory evaluatorManagerFactory) {
+ this.evaluators = evaluators;
+ this.evaluatorManagerFactory = evaluatorManagerFactory;
+ }
+
+ @Override
+ public void onNext(final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatMessage) {
+ final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto heartbeat = evaluatorHeartbeatMessage.getMessage();
+ final ReefServiceProtos.EvaluatorStatusProto status = heartbeat.getEvaluatorStatus();
+ final String evaluatorId = status.getEvaluatorId();
+
+ LOG.log(Level.FINEST, "TIME: Begin Heartbeat {0}", evaluatorId);
+ LOG.log(Level.FINEST, "Heartbeat from Evaluator {0} with state {1} timestamp {2} from remoteId {3}",
+ new Object[]{evaluatorId, status.getState(), heartbeat.getTimestamp(), evaluatorHeartbeatMessage.getIdentifier()});
+
+ final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(evaluatorId);
+ if (evaluatorManager.isPresent()) {
+ evaluatorManager.get().onEvaluatorHeartbeatMessage(evaluatorHeartbeatMessage);
+ } else {
+ final StringBuilder message = new StringBuilder("Contact from unknown Evaluator with identifier '");
+ message.append(evaluatorId);
+ if (heartbeat.hasEvaluatorStatus()) {
+ message.append("' with state '");
+ message.append(status.getState());
+ }
+ message.append('\'');
+ throw new RuntimeException(message.toString());
+ }
+ LOG.log(Level.FINEST, "TIME: End Heartbeat {0}", evaluatorId);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
new file mode 100644
index 0000000..3938e06
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -0,0 +1,529 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
+import org.apache.reef.driver.evaluator.EvaluatorType;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.exception.EvaluatorException;
+import org.apache.reef.exception.EvaluatorKilledByResourceManagerException;
+import org.apache.reef.io.naming.Identifiable;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.DriverRestartCompleted;
+import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
+import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
+import org.apache.reef.runtime.common.driver.context.ContextControlHandler;
+import org.apache.reef.runtime.common.driver.context.ContextRepresenters;
+import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource;
+import org.apache.reef.runtime.common.driver.task.TaskRepresenter;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.annotations.Name;
+import org.apache.reef.tang.annotations.NamedParameter;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.Optional;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.event.Alarm;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages a single Evaluator instance including all lifecycle instances:
+ * (AllocatedEvaluator, CompletedEvaluator, FailedEvaluator).
+ * <p/>
+ * A (periodic) heartbeat channel is established EvaluatorRuntime -> EvaluatorManager.
+ * The EvaluatorRuntime will (periodically) send (status) messages to the EvaluatorManager using this
+ * heartbeat channel.
+ * <p/>
+ * A (push-based) EventHandler channel is established EvaluatorManager -> EvaluatorRuntime.
+ * The EvaluatorManager uses this to forward Driver messages, launch Tasks, and initiate
+ * control information (e.g., shutdown, suspend).
+ */
+@Private
+@DriverSide
+public final class EvaluatorManager implements Identifiable, AutoCloseable {
+
+ private final static Logger LOG = Logger.getLogger(EvaluatorManager.class.getName());
+
+ private final EvaluatorHeartBeatSanityChecker sanityChecker = new EvaluatorHeartBeatSanityChecker();
+ private final Clock clock;
+ private final ResourceReleaseHandler resourceReleaseHandler;
+ private final ResourceLaunchHandler resourceLaunchHandler;
+ private final String evaluatorId;
+ private final EvaluatorDescriptorImpl evaluatorDescriptor;
+ private final ContextRepresenters contextRepresenters;
+ private final EvaluatorMessageDispatcher messageDispatcher;
+ private final EvaluatorControlHandler evaluatorControlHandler;
+ private final ContextControlHandler contextControlHandler;
+ private final EvaluatorStatusManager stateManager;
+ private final ExceptionCodec exceptionCodec;
+ private final DriverStatusManager driverStatusManager;
+ private final EventHandlerIdlenessSource idlenessSource;
+ private final LoggingScopeFactory loggingScopeFactory;
+
+
+ // Mutable fields
+ private Optional<TaskRepresenter> task = Optional.empty();
+ private boolean isResourceReleased = false;
+
+ @Inject
+ private EvaluatorManager(
+ final Clock clock,
+ final RemoteManager remoteManager,
+ final ResourceReleaseHandler resourceReleaseHandler,
+ final ResourceLaunchHandler resourceLaunchHandler,
+ final @Parameter(EvaluatorIdentifier.class) String evaluatorId,
+ final @Parameter(EvaluatorDescriptorName.class) EvaluatorDescriptorImpl evaluatorDescriptor,
+ final ContextRepresenters contextRepresenters,
+ final ConfigurationSerializer configurationSerializer,
+ final EvaluatorMessageDispatcher messageDispatcher,
+ final EvaluatorControlHandler evaluatorControlHandler,
+ final ContextControlHandler contextControlHandler,
+ final EvaluatorStatusManager stateManager,
+ final DriverStatusManager driverStatusManager,
+ final ExceptionCodec exceptionCodec,
+ final EventHandlerIdlenessSource idlenessSource,
+ final LoggingScopeFactory loggingScopeFactory) {
+ this.contextRepresenters = contextRepresenters;
+ this.idlenessSource = idlenessSource;
+ LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId);
+ this.clock = clock;
+ this.resourceReleaseHandler = resourceReleaseHandler;
+ this.resourceLaunchHandler = resourceLaunchHandler;
+ this.evaluatorId = evaluatorId;
+ this.evaluatorDescriptor = evaluatorDescriptor;
+
+ this.messageDispatcher = messageDispatcher;
+ this.evaluatorControlHandler = evaluatorControlHandler;
+ this.contextControlHandler = contextControlHandler;
+ this.stateManager = stateManager;
+ this.driverStatusManager = driverStatusManager;
+ this.exceptionCodec = exceptionCodec;
+ this.loggingScopeFactory = loggingScopeFactory;
+
+ final AllocatedEvaluator allocatedEvaluator =
+ new AllocatedEvaluatorImpl(this, remoteManager.getMyIdentifier(), configurationSerializer, getJobIdentifier(), loggingScopeFactory);
+ LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", evaluatorId);
+ this.messageDispatcher.onEvaluatorAllocated(allocatedEvaluator);
+ LOG.log(Level.FINEST, "Instantiated 'EvaluatorManager' for evaluator: [{0}]", this.getId());
+ }
+
+ /**
+ * Get the id of current job/application
+ */
+ public static String getJobIdentifier() {
+ // TODO: currently we obtain the job id directly by parsing execution (container) directory path
+ // #845 is open to get the id from RM properly
+ for (File directory = new File(System.getProperty("user.dir"));
+ directory != null; directory = directory.getParentFile()) {
+ final String currentDirectoryName = directory.getName();
+ if (currentDirectoryName.toLowerCase().contains("application_")) {
+ return currentDirectoryName;
+ }
+ }
+ // cannot find a directory that contains application_, presumably we are on local runtime
+ // again, this is a hack for now, we need #845 as a proper solution
+ return "REEF_LOCAL_RUNTIME";
+ }
+
+ private static boolean isDoneOrFailedOrKilled(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) {
+ return resourceStatusProto.getState() == ReefServiceProtos.State.DONE ||
+ resourceStatusProto.getState() == ReefServiceProtos.State.FAILED ||
+ resourceStatusProto.getState() == ReefServiceProtos.State.KILLED;
+ }
+
+ @Override
+ public String getId() {
+ return this.evaluatorId;
+ }
+
+ public void setType(final EvaluatorType type) {
+ this.evaluatorDescriptor.setType(type);
+ }
+
+ public EvaluatorDescriptor getEvaluatorDescriptor() {
+ return this.evaluatorDescriptor;
+ }
+
+ @Override
+ public void close() {
+ synchronized (this.evaluatorDescriptor) {
+ if (this.stateManager.isRunning()) {
+ LOG.log(Level.WARNING, "Dirty shutdown of running evaluator id[{0}]", getId());
+ try {
+ // Killing the evaluator means that it doesn't need to send a confirmation; it just dies.
+ final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto =
+ EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder()
+ .setTimestamp(System.currentTimeMillis())
+ .setIdentifier(getId())
+ .setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build())
+ .build();
+ sendEvaluatorControlMessage(evaluatorControlProto);
+ } finally {
+ this.stateManager.setKilled();
+ }
+ }
+
+
+ if (!this.isResourceReleased) {
+ this.isResourceReleased = true;
+ try {
+ /* We need to wait awhile before returning the container to the RM in order to
+ * give the EvaluatorRuntime (and Launcher) time to cleanly exit. */
+ this.clock.scheduleAlarm(100, new EventHandler<Alarm>() {
+ @Override
+ public void onNext(final Alarm alarm) {
+ EvaluatorManager.this.resourceReleaseHandler.onNext(
+ DriverRuntimeProtocol.ResourceReleaseProto.newBuilder()
+ .setIdentifier(EvaluatorManager.this.evaluatorId).build()
+ );
+ }
+ });
+ } catch (final IllegalStateException e) {
+ LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e);
+ EvaluatorManager.this.resourceReleaseHandler.onNext(
+ DriverRuntimeProtocol.ResourceReleaseProto.newBuilder()
+ .setIdentifier(EvaluatorManager.this.evaluatorId).build()
+ );
+ }
+ }
+ }
+ this.idlenessSource.check();
+ }
+
+ /**
+ * Return true if the state is DONE, FAILED, or KILLED,
+ * <em>and</em> there are no messages queued or in processing.
+ */
+ public boolean isClosed() {
+ return this.messageDispatcher.isEmpty() &&
+ (this.stateManager.isDoneOrFailedOrKilled());
+ }
+
+ /**
+ * EvaluatorException will trigger is FailedEvaluator and state transition to FAILED
+ *
+ * @param exception on the EvaluatorRuntime
+ */
+ public void onEvaluatorException(final EvaluatorException exception) {
+ synchronized (this.evaluatorDescriptor) {
+ if (this.stateManager.isDoneOrFailedOrKilled()) {
+ LOG.log(Level.FINE, "Ignoring an exception receivedfor Evaluator {0} which is already in state {1}.",
+ new Object[]{this.getId(), this.stateManager});
+ return;
+ }
+
+ LOG.log(Level.WARNING, "Failed evaluator: " + getId(), exception);
+
+ try {
+
+ final List<FailedContext> failedContextList = this.contextRepresenters.getFailedContextsForEvaluatorFailure();
+
+ final Optional<FailedTask> failedTaskOptional;
+ if (this.task.isPresent()) {
+ final String taskId = this.task.get().getId();
+ final Optional<ActiveContext> evaluatorContext = Optional.empty();
+ final Optional<byte[]> bytes = Optional.empty();
+ final Optional<Throwable> taskException = Optional.<Throwable>of(new Exception("Evaluator crash"));
+ final String message = "Evaluator crash";
+ final Optional<String> description = Optional.empty();
+ final FailedTask failedTask = new FailedTask(taskId, message, description, taskException, bytes, evaluatorContext);
+ failedTaskOptional = Optional.of(failedTask);
+ } else {
+ failedTaskOptional = Optional.empty();
+ }
+
+
+ this.messageDispatcher.onEvaluatorFailed(new FailedEvaluatorImpl(exception, failedContextList, failedTaskOptional, this.evaluatorId));
+
+ } catch (final Exception e) {
+ LOG.log(Level.SEVERE, "Exception while handling FailedEvaluator", e);
+ } finally {
+ this.stateManager.setFailed();
+ close();
+ }
+ }
+ }
+
+ public synchronized void onEvaluatorHeartbeatMessage(
+ final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatProtoRemoteMessage) {
+
+ final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto evaluatorHeartbeatProto =
+ evaluatorHeartbeatProtoRemoteMessage.getMessage();
+ LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto);
+
+ if (this.stateManager.isDoneOrFailedOrKilled()) {
+ LOG.log(Level.FINE, "Ignoring an heartbeat received for Evaluator {0} which is already in state {1}.",
+ new Object[]{this.getId(), this.stateManager});
+ return;
+ }
+
+ this.sanityChecker.check(evaluatorId, evaluatorHeartbeatProto.getTimestamp());
+ final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
+
+ // first message from a running evaluator trying to re-establish communications
+ if (evaluatorHeartbeatProto.getRecovery()) {
+ this.evaluatorControlHandler.setRemoteID(evaluatorRID);
+ this.stateManager.setRunning();
+
+ this.driverStatusManager.oneContainerRecovered();
+ final int numRecoveredContainers = this.driverStatusManager.getNumRecoveredContainers();
+
+ LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.", this.evaluatorId);
+ final int expectedEvaluatorsNumber = this.driverStatusManager.getNumPreviousContainers();
+
+ if (numRecoveredContainers > expectedEvaluatorsNumber) {
+ LOG.log(Level.SEVERE, "expecting only [{0}] recovered evaluators, but [{1}] evaluators have checked in.",
+ new Object[]{expectedEvaluatorsNumber, numRecoveredContainers});
+ throw new RuntimeException("More then expected number of evaluators are checking in during recovery.");
+ } else if (numRecoveredContainers == expectedEvaluatorsNumber) {
+ LOG.log(Level.INFO, "All [{0}] expected evaluators have checked in. Recovery completed.", expectedEvaluatorsNumber);
+ this.driverStatusManager.setRestartCompleted();
+ this.messageDispatcher.OnDriverRestartCompleted(new DriverRestartCompleted(System.currentTimeMillis()));
+ } else {
+ LOG.log(Level.INFO, "expecting [{0}] recovered evaluators, [{1}] evaluators have checked in.",
+ new Object[]{expectedEvaluatorsNumber, numRecoveredContainers});
+ }
+ }
+
+ // If this is the first message from this Evaluator, register it.
+ if (this.stateManager.isSubmitted()) {
+ this.evaluatorControlHandler.setRemoteID(evaluatorRID);
+ this.stateManager.setRunning();
+ LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
+ }
+
+ // Process the Evaluator status message
+ if (evaluatorHeartbeatProto.hasEvaluatorStatus()) {
+ this.onEvaluatorStatusMessage(evaluatorHeartbeatProto.getEvaluatorStatus());
+ }
+
+ // Process the Context status message(s)
+ final boolean informClientOfNewContexts = !evaluatorHeartbeatProto.hasTaskStatus();
+ this.contextRepresenters.onContextStatusMessages(evaluatorHeartbeatProto.getContextStatusList(),
+ informClientOfNewContexts);
+
+ // Process the Task status message
+ if (evaluatorHeartbeatProto.hasTaskStatus()) {
+ this.onTaskStatusMessage(evaluatorHeartbeatProto.getTaskStatus());
+ }
+ LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId());
+ }
+
+ /**
+ * Process a evaluator status message.
+ *
+ * @param message
+ */
+ private synchronized void onEvaluatorStatusMessage(final ReefServiceProtos.EvaluatorStatusProto message) {
+
+ switch (message.getState()) {
+ case DONE:
+ this.onEvaluatorDone(message);
+ break;
+ case FAILED:
+ this.onEvaluatorFailed(message);
+ break;
+ case INIT:
+ case KILLED:
+ case RUNNING:
+ case SUSPEND:
+ break;
+ }
+ }
+
+ /**
+ * Process an evaluator message that indicates that the evaluator shut down cleanly.
+ *
+ * @param message
+ */
+ private synchronized void onEvaluatorDone(final ReefServiceProtos.EvaluatorStatusProto message) {
+ assert (message.getState() == ReefServiceProtos.State.DONE);
+ LOG.log(Level.FINEST, "Evaluator {0} done.", getId());
+ this.stateManager.setDone();
+ this.messageDispatcher.onEvaluatorCompleted(new CompletedEvaluatorImpl(this.evaluatorId));
+ close();
+ }
+
+ /**
+ * Process an evaluator message that indicates a crash.
+ *
+ * @param evaluatorStatusProto
+ */
+ private synchronized void onEvaluatorFailed(final ReefServiceProtos.EvaluatorStatusProto evaluatorStatusProto) {
+ assert (evaluatorStatusProto.getState() == ReefServiceProtos.State.FAILED);
+ final EvaluatorException evaluatorException;
+ if (evaluatorStatusProto.hasError()) {
+ final Optional<Throwable> exception = this.exceptionCodec.fromBytes(evaluatorStatusProto.getError().toByteArray());
+ if (exception.isPresent()) {
+ evaluatorException = new EvaluatorException(getId(), exception.get());
+ } else {
+ evaluatorException = new EvaluatorException(getId(), new Exception("Exception sent, but can't be deserialized"));
+ }
+ } else {
+ evaluatorException = new EvaluatorException(getId(), new Exception("No exception sent"));
+ }
+ onEvaluatorException(evaluatorException);
+ }
+
+ public void onResourceLaunch(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) {
+ synchronized (this.evaluatorDescriptor) {
+ if (this.stateManager.isAllocated()) {
+ this.stateManager.setSubmitted();
+ this.resourceLaunchHandler.onNext(resourceLaunchProto);
+ } else {
+ throw new RuntimeException("Evaluator manager expected " + EvaluatorState.ALLOCATED +
+ " state but instead is in state " + this.stateManager);
+ }
+ }
+ }
+
+ /**
+ * Packages the ContextControlProto in an EvaluatorControlProto and forward it to the EvaluatorRuntime
+ *
+ * @param contextControlProto message contains context control info.
+ */
+ public void sendContextControlMessage(final EvaluatorRuntimeProtocol.ContextControlProto contextControlProto) {
+ synchronized (this.evaluatorDescriptor) {
+ LOG.log(Level.FINEST, "Context control message to {0}", this.evaluatorId);
+ this.contextControlHandler.send(contextControlProto);
+ }
+ }
+
+ /**
+ * Forward the EvaluatorControlProto to the EvaluatorRuntime
+ *
+ * @param evaluatorControlProto message contains evaluator control information.
+ */
+ void sendEvaluatorControlMessage(final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto) {
+ synchronized (this.evaluatorDescriptor) {
+ this.evaluatorControlHandler.send(evaluatorControlProto);
+ }
+ }
+
+ /**
+ * Handle task status messages.
+ *
+ * @param taskStatusProto message contains the current task status.
+ */
+ private void onTaskStatusMessage(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+
+ if (!(this.task.isPresent() && this.task.get().getId().equals(taskStatusProto.getTaskId()))) {
+ if (taskStatusProto.getState() == ReefServiceProtos.State.INIT ||
+ taskStatusProto.getState() == ReefServiceProtos.State.FAILED ||
+ taskStatusProto.getRecovery() // for task from recovered evaluators
+ ) {
+
+ // FAILED is a legal first state of a Task as it could have failed during construction.
+ this.task = Optional.of(
+ new TaskRepresenter(taskStatusProto.getTaskId(),
+ this.contextRepresenters.getContext(taskStatusProto.getContextId()),
+ this.messageDispatcher,
+ this,
+ this.exceptionCodec));
+ } else {
+ throw new RuntimeException("Received an message of state " + taskStatusProto.getState() +
+ ", not INIT or FAILED for Task " + taskStatusProto.getTaskId() + " which we haven't heard from before.");
+ }
+ }
+ this.task.get().onTaskStatusMessage(taskStatusProto);
+
+ if (this.task.get().isNotRunning()) {
+ LOG.log(Level.FINEST, "Task no longer running. De-registering it.");
+ this.task = Optional.empty();
+ }
+ }
+
+ /**
+ * Resource status information from the (actual) resource manager.
+ */
+ public void onResourceStatusMessage(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) {
+ synchronized (this.evaluatorDescriptor) {
+ LOG.log(Level.FINEST, "Resource manager state update: {0}", resourceStatusProto.getState());
+ if (this.stateManager.isDoneOrFailedOrKilled()) {
+ LOG.log(Level.FINE, "Ignoring resource status update for Evaluator {0} which is already in state {1}.",
+ new Object[]{this.getId(), this.stateManager});
+ } else if (isDoneOrFailedOrKilled(resourceStatusProto) && this.stateManager.isAllocatedOrSubmittedOrRunning()) {
+ // something is wrong. The resource manager reports that the Evaluator is done or failed, but the Driver assumes
+ // it to be alive.
+ final StringBuilder messageBuilder = new StringBuilder("Evaluator [")
+ .append(this.evaluatorId)
+ .append("] is assumed to be in state [")
+ .append(this.stateManager.toString())
+ .append("]. But the resource manager reports it to be in state [")
+ .append(resourceStatusProto.getState())
+ .append("].");
+
+ if (this.stateManager.isSubmitted()) {
+ messageBuilder
+ .append(" This most likely means that the Evaluator suffered a failure before establishing a communications link to the driver.");
+ } else if (this.stateManager.isAllocated()) {
+ messageBuilder.append(" This most likely means that the Evaluator suffered a failure before being used.");
+ } else if (this.stateManager.isRunning()) {
+ messageBuilder.append(" This means that the Evaluator failed but wasn't able to send an error message back to the driver.");
+ }
+ if (this.task.isPresent()) {
+ messageBuilder.append(" Task [")
+ .append(this.task.get().getId())
+ .append("] was running when the Evaluator crashed.");
+ }
+ this.isResourceReleased = true;
+
+ if (resourceStatusProto.getState() == ReefServiceProtos.State.KILLED) {
+ this.onEvaluatorException(new EvaluatorKilledByResourceManagerException(this.evaluatorId, messageBuilder.toString()));
+ } else {
+ this.onEvaluatorException(new EvaluatorException(this.evaluatorId, messageBuilder.toString()));
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "EvaluatorManager:"
+ + " id=" + this.evaluatorId
+ + " state=" + this.stateManager
+ + " task=" + this.task;
+ }
+
+ // Dynamic Parameters
+ @NamedParameter(doc = "The Evaluator Identifier.")
+ public final static class EvaluatorIdentifier implements Name<String> {
+ }
+
+ @NamedParameter(doc = "The Evaluator Host.")
+ public final static class EvaluatorDescriptorName implements Name<EvaluatorDescriptorImpl> {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
new file mode 100644
index 0000000..d6e7757
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.driver.catalog.ResourceCatalog;
+import org.apache.reef.driver.evaluator.EvaluatorType;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorHandler;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.exceptions.BindException;
+import org.apache.reef.tang.exceptions.InjectionException;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Helper class that creates new EvaluatorManager instances from alloations.
+ */
+@Private
+@DriverSide
+public final class EvaluatorManagerFactory {
+ private static final Logger LOG = Logger.getLogger(EvaluatorManagerFactory.class.getName());
+
+ private final Injector injector;
+ private final ResourceCatalog resourceCatalog;
+
+ @Inject
+ EvaluatorManagerFactory(final Injector injector, final ResourceCatalog resourceCatalog, final NodeDescriptorHandler nodeDescriptorHandler) {
+ this.injector = injector;
+ this.resourceCatalog = resourceCatalog;
+ }
+
+ /**
+ * Helper method to create a new EvaluatorManager instance
+ *
+ * @param id identifier of the Evaluator
+ * @param desc NodeDescriptor on which the Evaluator executes.
+ * @return a new EvaluatorManager instance.
+ */
+ private final EvaluatorManager getNewEvaluatorManagerInstance(final String id, final EvaluatorDescriptorImpl desc) {
+ LOG.log(Level.FINEST, "Creating Evaluator Manager for Evaluator ID {0}", id);
+ final Injector child = this.injector.forkInjector();
+
+ try {
+ child.bindVolatileParameter(EvaluatorManager.EvaluatorIdentifier.class, id);
+ child.bindVolatileParameter(EvaluatorManager.EvaluatorDescriptorName.class, desc);
+ } catch (final BindException e) {
+ throw new RuntimeException("Unable to bind evaluator identifier and name.", e);
+ }
+
+ final EvaluatorManager result;
+ try {
+ result = child.getInstance(EvaluatorManager.class);
+ } catch (final InjectionException e) {
+ throw new RuntimeException("Unable to instantiate a new EvaluatorManager for Evaluator ID: " + id, e);
+ }
+ return result;
+ }
+
+ /**
+ * Instantiates a new EvaluatorManager based on a resource allocation.
+ *
+ * @param resourceAllocationProto
+ * @return
+ */
+ public final EvaluatorManager getNewEvaluatorManager(final DriverRuntimeProtocol.ResourceAllocationProto resourceAllocationProto) {
+ final NodeDescriptor nodeDescriptor = this.resourceCatalog.getNode(resourceAllocationProto.getNodeId());
+
+ if (nodeDescriptor == null) {
+ throw new RuntimeException("Unknown resource: " + resourceAllocationProto.getNodeId());
+ }
+ final EvaluatorDescriptorImpl evaluatorDescriptor = new EvaluatorDescriptorImpl(nodeDescriptor,
+ EvaluatorType.UNDECIDED, resourceAllocationProto.getResourceMemory(), resourceAllocationProto.getVirtualCores());
+
+ LOG.log(Level.FINEST, "Resource allocation: new evaluator id[{0}]", resourceAllocationProto.getIdentifier());
+ return this.getNewEvaluatorManagerInstance(resourceAllocationProto.getIdentifier(), evaluatorDescriptor);
+ }
+
+ public final EvaluatorManager createForEvaluatorFailedDuringDriverRestart(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) {
+ if (!resourceStatusProto.getIsFromPreviousDriver()) {
+ throw new RuntimeException("Invalid resourceStatusProto, must be status for resource from previous Driver.");
+ }
+ return getNewEvaluatorManagerInstance(resourceStatusProto.getIdentifier(), new EvaluatorDescriptorImpl(null, EvaluatorType.UNDECIDED, 128, 1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
new file mode 100644
index 0000000..709cd30
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.context.ClosedContext;
+import org.apache.reef.driver.context.ContextMessage;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.CompletedEvaluator;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.parameters.*;
+import org.apache.reef.driver.task.*;
+import org.apache.reef.runtime.common.DriverRestartCompleted;
+import org.apache.reef.runtime.common.driver.DriverExceptionHandler;
+import org.apache.reef.runtime.common.utils.DispatchingEStage;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Central dispatcher for all Evaluator related events. This exists once per Evaluator.
+ */
+public final class EvaluatorMessageDispatcher {
+
+ private static final Logger LOG = Logger.getLogger(EvaluatorMessageDispatcher.class.getName());
+
+ /**
+ * Dispatcher used for application provided event handlers.
+ */
+ private final DispatchingEStage applicationDispatcher;
+
+ /**
+ * Dispatcher used for service provided event handlers.
+ */
+ private final DispatchingEStage serviceDispatcher;
+
+
+ /**
+ * Dispatcher used for application provided driver-restart specific event handlers.
+ */
+ private final DispatchingEStage driverRestartApplicationDispatcher;
+
+ /**
+ * Dispatcher used for service provided driver-restart specific event handlers.
+ */
+ private final DispatchingEStage driverRestartServiceDispatcher;
+
+ @Inject
+ EvaluatorMessageDispatcher(
+ // Application-provided Context event handlers
+ final @Parameter(ContextActiveHandlers.class) Set<EventHandler<ActiveContext>> contextActiveHandlers,
+ final @Parameter(ContextClosedHandlers.class) Set<EventHandler<ClosedContext>> contextClosedHandlers,
+ final @Parameter(ContextFailedHandlers.class) Set<EventHandler<FailedContext>> contextFailedHandlers,
+ final @Parameter(ContextMessageHandlers.class) Set<EventHandler<ContextMessage>> contextMessageHandlers,
+ // Service-provided Context event handlers
+ final @Parameter(ServiceContextActiveHandlers.class) Set<EventHandler<ActiveContext>> serviceContextActiveHandlers,
+ final @Parameter(ServiceContextClosedHandlers.class) Set<EventHandler<ClosedContext>> serviceContextClosedHandlers,
+ final @Parameter(ServiceContextFailedHandlers.class) Set<EventHandler<FailedContext>> serviceContextFailedHandlers,
+ final @Parameter(ServiceContextMessageHandlers.class) Set<EventHandler<ContextMessage>> serviceContextMessageHandlers,
+ // Application-provided Task event handlers
+ final @Parameter(TaskRunningHandlers.class) Set<EventHandler<RunningTask>> taskRunningHandlers,
+ final @Parameter(TaskCompletedHandlers.class) Set<EventHandler<CompletedTask>> taskCompletedHandlers,
+ final @Parameter(TaskSuspendedHandlers.class) Set<EventHandler<SuspendedTask>> taskSuspendedHandlers,
+ final @Parameter(TaskMessageHandlers.class) Set<EventHandler<TaskMessage>> taskMessageEventHandlers,
+ final @Parameter(TaskFailedHandlers.class) Set<EventHandler<FailedTask>> taskExceptionEventHandlers,
+ // Service-provided Task event handlers
+ final @Parameter(ServiceTaskRunningHandlers.class) Set<EventHandler<RunningTask>> serviceTaskRunningEventHandlers,
+ final @Parameter(ServiceTaskCompletedHandlers.class) Set<EventHandler<CompletedTask>> serviceTaskCompletedEventHandlers,
+ final @Parameter(ServiceTaskSuspendedHandlers.class) Set<EventHandler<SuspendedTask>> serviceTaskSuspendedEventHandlers,
+ final @Parameter(ServiceTaskMessageHandlers.class) Set<EventHandler<TaskMessage>> serviceTaskMessageEventHandlers,
+ final @Parameter(ServiceTaskFailedHandlers.class) Set<EventHandler<FailedTask>> serviceTaskExceptionEventHandlers,
+ // Application-provided Evaluator event handlers
+ final @Parameter(EvaluatorAllocatedHandlers.class) Set<EventHandler<AllocatedEvaluator>> evaluatorAllocatedHandlers,
+ final @Parameter(EvaluatorFailedHandlers.class) Set<EventHandler<FailedEvaluator>> evaluatorFailedHandlers,
+ final @Parameter(EvaluatorCompletedHandlers.class) Set<EventHandler<CompletedEvaluator>> evaluatorCompletedHandlers,
+ // Service-provided Evaluator event handlers
+ final @Parameter(ServiceEvaluatorAllocatedHandlers.class) Set<EventHandler<AllocatedEvaluator>> serviceEvaluatorAllocatedEventHandlers,
+ final @Parameter(ServiceEvaluatorFailedHandlers.class) Set<EventHandler<FailedEvaluator>> serviceEvaluatorFailedHandlers,
+ final @Parameter(ServiceEvaluatorCompletedHandlers.class) Set<EventHandler<CompletedEvaluator>> serviceEvaluatorCompletedHandlers,
+
+ // Application event handlers specific to a Driver restart
+ final @Parameter(DriverRestartTaskRunningHandlers.class) Set<EventHandler<RunningTask>> driverRestartTaskRunningHandlers,
+ final @Parameter(DriverRestartContextActiveHandlers.class) Set<EventHandler<ActiveContext>> driverRestartActiveContextHandlers,
+ final @Parameter(DriverRestartCompletedHandlers.class) Set<EventHandler<DriverRestartCompleted>> driverRestartCompletedHandlers,
+
+ // Service-provided event handlers specific to a Driver restart
+ final @Parameter(ServiceDriverRestartTaskRunningHandlers.class) Set<EventHandler<RunningTask>> serviceDriverRestartTaskRunningHandlers,
+ final @Parameter(ServiceDriverRestartContextActiveHandlers.class) Set<EventHandler<ActiveContext>> serviceDriverRestartActiveContextHandlers,
+ final @Parameter(ServiceDriverRestartCompletedHandlers.class) Set<EventHandler<DriverRestartCompleted>> serviceDriverRestartCompletedHandlers,
+
+ final @Parameter(EvaluatorDispatcherThreads.class) int numberOfThreads,
+ final @Parameter(EvaluatorManager.EvaluatorIdentifier.class) String evaluatorIdentifier,
+ final DriverExceptionHandler driverExceptionHandler) {
+
+ this.serviceDispatcher = new DispatchingEStage(driverExceptionHandler, numberOfThreads, evaluatorIdentifier);
+ this.applicationDispatcher = new DispatchingEStage(this.serviceDispatcher);
+ this.driverRestartApplicationDispatcher = new DispatchingEStage(this.serviceDispatcher);
+ this.driverRestartServiceDispatcher = new DispatchingEStage(this.serviceDispatcher);
+
+ { // Application Context event handlers
+ this.applicationDispatcher.register(ActiveContext.class, contextActiveHandlers);
+ this.applicationDispatcher.register(ClosedContext.class, contextClosedHandlers);
+ this.applicationDispatcher.register(FailedContext.class, contextFailedHandlers);
+ this.applicationDispatcher.register(ContextMessage.class, contextMessageHandlers);
+ }
+ { // Service Context event handlers
+ this.serviceDispatcher.register(ActiveContext.class, serviceContextActiveHandlers);
+ this.serviceDispatcher.register(ClosedContext.class, serviceContextClosedHandlers);
+ this.serviceDispatcher.register(FailedContext.class, serviceContextFailedHandlers);
+ this.serviceDispatcher.register(ContextMessage.class, serviceContextMessageHandlers);
+ }
+ { // Application Task event handlers.
+ this.applicationDispatcher.register(RunningTask.class, taskRunningHandlers);
+ this.applicationDispatcher.register(CompletedTask.class, taskCompletedHandlers);
+ this.applicationDispatcher.register(SuspendedTask.class, taskSuspendedHandlers);
+ this.applicationDispatcher.register(TaskMessage.class, taskMessageEventHandlers);
+ this.applicationDispatcher.register(FailedTask.class, taskExceptionEventHandlers);
+ }
+ { // Service Task event handlers
+ this.serviceDispatcher.register(RunningTask.class, serviceTaskRunningEventHandlers);
+ this.serviceDispatcher.register(CompletedTask.class, serviceTaskCompletedEventHandlers);
+ this.serviceDispatcher.register(SuspendedTask.class, serviceTaskSuspendedEventHandlers);
+ this.serviceDispatcher.register(TaskMessage.class, serviceTaskMessageEventHandlers);
+ this.serviceDispatcher.register(FailedTask.class, serviceTaskExceptionEventHandlers);
+ }
+ { // Application Evaluator event handlers
+ this.applicationDispatcher.register(FailedEvaluator.class, evaluatorFailedHandlers);
+ this.applicationDispatcher.register(CompletedEvaluator.class, evaluatorCompletedHandlers);
+ this.applicationDispatcher.register(AllocatedEvaluator.class, evaluatorAllocatedHandlers);
+ }
+ { // Service Evaluator event handlers
+ this.serviceDispatcher.register(FailedEvaluator.class, serviceEvaluatorFailedHandlers);
+ this.serviceDispatcher.register(CompletedEvaluator.class, serviceEvaluatorCompletedHandlers);
+ this.serviceDispatcher.register(AllocatedEvaluator.class, serviceEvaluatorAllocatedEventHandlers);
+ }
+
+ // Application event handlers specific to a Driver restart
+ {
+ this.driverRestartApplicationDispatcher.register(RunningTask.class, driverRestartTaskRunningHandlers);
+ this.driverRestartApplicationDispatcher.register(ActiveContext.class, driverRestartActiveContextHandlers);
+ this.driverRestartApplicationDispatcher.register(DriverRestartCompleted.class, driverRestartCompletedHandlers);
+ }
+
+ // Service event handlers specific to a Driver restart
+ {
+ this.driverRestartServiceDispatcher.register(RunningTask.class, serviceDriverRestartTaskRunningHandlers);
+ this.driverRestartServiceDispatcher.register(ActiveContext.class, serviceDriverRestartActiveContextHandlers);
+ this.driverRestartServiceDispatcher.register(DriverRestartCompleted.class, serviceDriverRestartCompletedHandlers);
+ }
+ LOG.log(Level.FINE, "Instantiated 'EvaluatorMessageDispatcher'");
+ }
+
+ public void onEvaluatorAllocated(final AllocatedEvaluator allocatedEvaluator) {
+ this.dispatch(AllocatedEvaluator.class, allocatedEvaluator);
+ }
+
+ public void onEvaluatorFailed(final FailedEvaluator failedEvaluator) {
+ this.dispatch(FailedEvaluator.class, failedEvaluator);
+ }
+
+ public void onEvaluatorCompleted(final CompletedEvaluator completedEvaluator) {
+ this.dispatch(CompletedEvaluator.class, completedEvaluator);
+ }
+
+ public void onTaskRunning(final RunningTask runningTask) {
+ this.dispatch(RunningTask.class, runningTask);
+ }
+
+ public void onTaskCompleted(final CompletedTask completedTask) {
+ this.dispatch(CompletedTask.class, completedTask);
+ }
+
+ public void onTaskSuspended(final SuspendedTask suspendedTask) {
+ this.dispatch(SuspendedTask.class, suspendedTask);
+ }
+
+ public void onTaskMessage(final TaskMessage taskMessage) {
+ this.dispatch(TaskMessage.class, taskMessage);
+ }
+
+ public void onTaskFailed(final FailedTask failedTask) {
+ this.dispatch(FailedTask.class, failedTask);
+ }
+
+ public void onContextActive(final ActiveContext activeContext) {
+ this.dispatch(ActiveContext.class, activeContext);
+ }
+
+ public void onContextClose(final ClosedContext closedContext) {
+ this.dispatch(ClosedContext.class, closedContext);
+ }
+
+ public void onContextFailed(final FailedContext failedContext) {
+ this.dispatch(FailedContext.class, failedContext);
+ }
+
+ public void onContextMessage(final ContextMessage contextMessage) {
+ this.dispatch(ContextMessage.class, contextMessage);
+ }
+
+ public void onDriverRestartTaskRunning(final RunningTask runningTask) {
+ this.dispatchForRestartedDriver(RunningTask.class, runningTask);
+ }
+
+ public void OnDriverRestartContextActive(final ActiveContext activeContext) {
+ this.dispatchForRestartedDriver(ActiveContext.class, activeContext);
+ }
+
+ public void OnDriverRestartCompleted(final DriverRestartCompleted restartCompleted) {
+ this.dispatchForRestartedDriver(DriverRestartCompleted.class, restartCompleted);
+ }
+
+ boolean isEmpty() {
+ return this.applicationDispatcher.isEmpty();
+ }
+
+ private <T, U extends T> void dispatch(final Class<T> type, final U message) {
+ this.serviceDispatcher.onNext(type, message);
+ this.applicationDispatcher.onNext(type, message);
+ }
+
+ private <T, U extends T> void dispatchForRestartedDriver(final Class<T> type, final U message) {
+ this.driverRestartApplicationDispatcher.onNext(type, message);
+ this.driverRestartServiceDispatcher.onNext(type, message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java
new file mode 100644
index 0000000..1b9a4ca
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorResourceManagerErrorHandler.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.client.FailedRuntime;
+import org.apache.reef.exception.EvaluatorException;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.remote.RemoteMessage;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * The error handler receives all resourcemanager errors from all evaluators in the system.
+ * Its primary function is to dispatch these to the appropriate EvaluatorManager.
+ */
+@Private
+public final class EvaluatorResourceManagerErrorHandler implements EventHandler<RemoteMessage<ReefServiceProtos.RuntimeErrorProto>> {
+ private static final Logger LOG = Logger.getLogger(EvaluatorResourceManagerErrorHandler.class.toString());
+ private final Evaluators evaluators;
+
+
+ @Inject
+ EvaluatorResourceManagerErrorHandler(final Evaluators evaluators) {
+ this.evaluators = evaluators;
+ LOG.log(Level.FINE, "Instantiated 'EvaluatorResourceManagerErrorHandler'");
+ }
+
+ @Override
+ public void onNext(final RemoteMessage<ReefServiceProtos.RuntimeErrorProto> runtimeErrorProtoRemoteMessage) {
+ final ReefServiceProtos.RuntimeErrorProto runtimeErrorProto = runtimeErrorProtoRemoteMessage.getMessage();
+ final FailedRuntime error = new FailedRuntime(runtimeErrorProto);
+ final String evaluatorId = error.getId();
+ LOG.log(Level.WARNING, "Runtime error: " + error);
+
+ final EvaluatorException evaluatorException = error.getReason().isPresent() ?
+ new EvaluatorException(evaluatorId, error.getReason().get()) :
+ new EvaluatorException(evaluatorId, "Runtime error");
+
+ final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(evaluatorId);
+ if (evaluatorManager.isPresent()) {
+ evaluatorManager.get().onEvaluatorException(evaluatorException);
+ } else {
+ LOG.log(Level.WARNING, "Unknown evaluator runtime error: " + error);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
new file mode 100644
index 0000000..e1e778e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+/**
+ * Various states that the EvaluatorManager could be in. The EvaluatorManager is
+ * created when a resource has been allocated by the ResourceManager.
+ */
+@DriverSide
+@Private
+enum EvaluatorState {
+ ALLOCATED, // initial state
+ SUBMITTED, // client called AllocatedEvaluator.submitTask() and we're waiting for first contact
+ RUNNING, // first contact received, all communication channels established, Evaluator sent to client.
+ // TODO: Add CLOSING state
+ DONE, // clean shutdown
+ FAILED, // some failure occurred.
+ KILLED // unclean shutdown
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
new file mode 100644
index 0000000..ba8a62f
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages Status of a single Evaluator.
+ */
+@DriverSide
+@Private
+final class EvaluatorStatusManager {
+ private static final Logger LOG = Logger.getLogger(EvaluatorStatusManager.class.getName());
+ /**
+ * The state managed.
+ */
+ private EvaluatorState state = EvaluatorState.ALLOCATED;
+
+ @Inject
+ private EvaluatorStatusManager() {
+ LOG.log(Level.FINE, "Instantiated 'EvaluatorStatusManager'");
+ }
+
+ private static boolean isLegal(final EvaluatorState from, final EvaluatorState to) {
+ // TODO
+ return true;
+ }
+
+ synchronized void setRunning() {
+ this.setState(EvaluatorState.RUNNING);
+ }
+
+ synchronized void setSubmitted() {
+ this.setState(EvaluatorState.SUBMITTED);
+ }
+
+ synchronized void setDone() {
+ this.setState(EvaluatorState.DONE);
+ }
+
+ synchronized void setFailed() {
+ this.setState(EvaluatorState.FAILED);
+ }
+
+ synchronized void setKilled() {
+ this.setState(EvaluatorState.KILLED);
+ }
+
+ synchronized boolean isRunning() {
+ return this.state.equals(EvaluatorState.RUNNING);
+ }
+
+ synchronized boolean isDoneOrFailedOrKilled() {
+ return (this.state == EvaluatorState.DONE ||
+ this.state == EvaluatorState.FAILED ||
+ this.state == EvaluatorState.KILLED);
+ }
+
+ synchronized boolean isAllocatedOrSubmittedOrRunning() {
+ return (this.state == EvaluatorState.ALLOCATED ||
+ this.state == EvaluatorState.SUBMITTED ||
+ this.state == EvaluatorState.RUNNING);
+ }
+
+ synchronized boolean isSubmitted() {
+ return EvaluatorState.SUBMITTED == this.state;
+ }
+
+ synchronized boolean isAllocated() {
+ return EvaluatorState.ALLOCATED == this.state;
+ }
+
+ @Override
+ public synchronized String toString() {
+ return this.state.toString();
+ }
+
+ private synchronized void setState(final EvaluatorState state) {
+ if (!isLegal(this.state, state)) {
+ throw new IllegalStateException("Illegal state transition from '" + this.state + "' to '" + state + "'");
+ }
+ this.state = state;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
new file mode 100644
index 0000000..dde4e70
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/Evaluators.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.util.Optional;
+import org.apache.reef.util.SingletonAsserter;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages all Evaluators.
+ * See EvaluatorManager for the Driver side representation of a single Evaluator.
+ */
+@DriverSide
+@Private
+public final class Evaluators implements AutoCloseable {
+
+ private static final Logger LOG = Logger.getLogger(Evaluators.class.getName());
+
+ /**
+ * A map between evaluatorId and the EvaluatorManager that handles this evaluator.
+ */
+ private final Map<String, EvaluatorManager> evaluators = new HashMap<>();
+
+
+ @Inject
+ Evaluators() {
+ LOG.log(Level.FINE, "Instantiated 'Evaluators'");
+ assert (SingletonAsserter.assertSingleton(Evaluators.class));
+ }
+
+ /**
+ * Closes all EvaluatorManager instances managed.
+ */
+ @Override
+ public void close() {
+ final List<EvaluatorManager> evaluatorsCopy;
+ synchronized (this) {
+ evaluatorsCopy = new ArrayList<>(this.evaluators.values());
+ }
+ for (final EvaluatorManager evaluatorManager : evaluatorsCopy) {
+ LOG.log(Level.WARNING, "Unclean shutdown of evaluator {0}", evaluatorManager.getId());
+ if (!evaluatorManager.isClosed()) {
+ evaluatorManager.close();
+ }
+ }
+ }
+
+ /**
+ * Return true if <em>all</em> evaluators are in closed state
+ * (and their processing queues are empty).
+ */
+ public synchronized boolean allEvaluatorsAreClosed() {
+ synchronized (this.evaluators) {
+ for (final EvaluatorManager eval : this.evaluators.values()) {
+ if (!eval.isClosed()) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * @param evaluatorId
+ * @return the EvaluatorManager for the given id, if one exists.
+ */
+ public synchronized Optional<EvaluatorManager> get(final String evaluatorId) {
+ return Optional.ofNullable(this.evaluators.get(evaluatorId));
+ }
+
+ /**
+ * Create new EvaluatorManager and add it to the collection.
+ * <p/>
+ * FIXME: This method is a temporary fix for the race condition
+ * described in issues #828 and #839.
+ *
+ * @param evaluatorManagerFactory Factory that builds new EvaluatorManager objects.
+ * @param evaluatorMsg Resource allocation message that contains data on the new evaluator.
+ * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known.
+ */
+ public synchronized void put(
+ final EvaluatorManagerFactory evaluatorManagerFactory,
+ final DriverRuntimeProtocol.ResourceAllocationProto evaluatorMsg) {
+ this.put(evaluatorManagerFactory.getNewEvaluatorManager(evaluatorMsg));
+ }
+
+ /**
+ * Adds an EvaluatorManager.
+ *
+ * @param evaluatorManager
+ * @throws java.lang.IllegalArgumentException if the EvaluatorManager is already known.
+ */
+ public synchronized void put(final EvaluatorManager evaluatorManager) {
+ final String evaluatorId = evaluatorManager.getId();
+ final EvaluatorManager prev = this.evaluators.put(evaluatorId, evaluatorManager);
+ LOG.log(Level.FINEST, "Adding: {0} previous: {1}", new Object[]{evaluatorId, prev});
+ if (prev != null) {
+ throw new IllegalArgumentException(
+ "Trying to re-add an Evaluator that is already known: " + evaluatorId);
+ }
+ }
+}