You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/01/23 00:47:10 UTC
[37/51] [partial] incubator-reef git commit: [REEF-93] Move java
sources to lang/java
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/FailedEvaluatorImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/FailedEvaluatorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/FailedEvaluatorImpl.java
new file mode 100644
index 0000000..450e8e5
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/FailedEvaluatorImpl.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.FailedContext;
+import org.apache.reef.driver.evaluator.FailedEvaluator;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.exception.EvaluatorException;
+import org.apache.reef.util.Optional;
+
+import java.util.List;
+
+@DriverSide
+@Private
+final class FailedEvaluatorImpl implements FailedEvaluator {
+
+ final String id;
+ private final EvaluatorException ex;
+ private final List<FailedContext> ctx;
+ private final Optional<FailedTask> task;
+
+ public FailedEvaluatorImpl(final EvaluatorException ex, final List<FailedContext> ctx, final Optional<FailedTask> task, final String id) {
+ this.ex = ex;
+ this.ctx = ctx;
+ this.task = task;
+ this.id = id;
+ }
+
+ @Override
+ public EvaluatorException getEvaluatorException() {
+ return this.ex;
+ }
+
+ @Override
+ public List<FailedContext> getFailedContextList() {
+ return this.ctx;
+ }
+
+ @Override
+ public Optional<FailedTask> getFailedTask() {
+ return this.task;
+ }
+
+ @Override
+ public String getId() {
+ return this.id;
+ }
+
+ @Override
+ public String toString() {
+ return "FailedEvaluator{" +
+ "id='" + id + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/package-info.java
new file mode 100644
index 0000000..7d6deee
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/package-info.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Driver-Side representations of Evaluators.
+ */
+@DriverSide
+@Private package org.apache.reef.runtime.common.driver.evaluator;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/ClockIdlenessSource.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/ClockIdlenessSource.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/ClockIdlenessSource.java
new file mode 100644
index 0000000..460f5a4
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/ClockIdlenessSource.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.idle;
+
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.runtime.event.IdleClock;
+
+import javax.inject.Inject;
+
+/**
+ * Informs the DriverIdleManager of clock idleness.
+ */
+public final class ClockIdlenessSource implements DriverIdlenessSource, EventHandler<IdleClock> {
+ private static final String COMPONENT_NAME = "Clock";
+ private static final String IDLE_REASON = "The clock reported idle.";
+ private static final IdleMessage IDLE_MESSAGE = new IdleMessage(COMPONENT_NAME, IDLE_REASON, true);
+ private static final String NOT_IDLE_REASON = "The clock reported not idle.";
+ private static final IdleMessage NOT_IDLE_MESSAGE = new IdleMessage(COMPONENT_NAME, NOT_IDLE_REASON, false);
+
+ private final InjectionFuture<DriverIdleManager> driverIdleManager;
+ private final Clock clock;
+
+ @Inject
+ ClockIdlenessSource(final InjectionFuture<DriverIdleManager> driverIdleManager, final Clock clock) {
+ this.driverIdleManager = driverIdleManager;
+ this.clock = clock;
+ }
+
+ @Override
+ public synchronized IdleMessage getIdleStatus() {
+ if (this.clock.isIdle()) {
+ return IDLE_MESSAGE;
+ } else {
+ return NOT_IDLE_MESSAGE;
+ }
+ }
+
+ @Override
+ public synchronized void onNext(final IdleClock idleClock) {
+ this.driverIdleManager.get().onPotentiallyIdle(IDLE_MESSAGE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
new file mode 100644
index 0000000..79d4d8c
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.idle;
+
+import org.apache.reef.driver.parameters.DriverIdleSources;
+import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Handles the various sources for driver idleness and forwards decisions to DriverStatusManager.
+ */
+public final class DriverIdleManager {
+ private static final Logger LOG = Logger.getLogger(DriverIdleManager.class.getName());
+ private static final Level IDLE_REASONS_LEVEL = Level.FINEST;
+ private final Set<DriverIdlenessSource> idlenessSources;
+ private final InjectionFuture<DriverStatusManager> driverStatusManager;
+
+ @Inject
+ DriverIdleManager(final @Parameter(DriverIdleSources.class) Set<DriverIdlenessSource> idlenessSources,
+ final InjectionFuture<DriverStatusManager> driverStatusManager) {
+ this.idlenessSources = idlenessSources;
+ this.driverStatusManager = driverStatusManager;
+ }
+
+ public synchronized void onPotentiallyIdle(final IdleMessage reason) {
+ synchronized (driverStatusManager.get()) {
+ if (this.driverStatusManager.get().isShuttingDownOrFailing()) {
+ LOG.log(IDLE_REASONS_LEVEL, "Ignoring idle call from [{0}] for reason [{1}]",
+ new Object[]{reason.getComponentName(), reason.getReason()});
+ } else {
+ boolean isIdle = true;
+ LOG.log(IDLE_REASONS_LEVEL, "Checking for idle because {0} reported idleness for reason [{1}]",
+ new Object[]{reason.getComponentName(), reason.getReason()});
+
+
+ for (final DriverIdlenessSource idlenessSource : this.idlenessSources) {
+ final IdleMessage idleMessage = idlenessSource.getIdleStatus();
+ LOG.log(IDLE_REASONS_LEVEL, "[{0}] is reporting {1} because [{2}]."
+ , new Object[]{idleMessage.getComponentName(), idleMessage.isIdle() ? "idle" : "not idle", idleMessage.getReason()}
+ );
+ isIdle &= idleMessage.isIdle();
+ }
+
+ if (isIdle) {
+ LOG.log(Level.INFO, "All components indicated idle. Initiating Driver shutdown.");
+ this.driverStatusManager.get().onComplete();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdlenessSource.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdlenessSource.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdlenessSource.java
new file mode 100644
index 0000000..11f6ad1
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdlenessSource.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.idle;
+
+/**
+ * Implementations of this interface are used to determine driver idleness.
+ */
+public interface DriverIdlenessSource {
+
+ /**
+ * @return the idle status of the component.
+ */
+ IdleMessage getIdleStatus();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/EventHandlerIdlenessSource.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/EventHandlerIdlenessSource.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/EventHandlerIdlenessSource.java
new file mode 100644
index 0000000..a8a87c5
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/EventHandlerIdlenessSource.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.idle;
+
+import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
+import org.apache.reef.tang.InjectionFuture;
+
+import javax.inject.Inject;
+
+/**
+ * Checks for idleness of the Event handlers.
+ */
+public final class EventHandlerIdlenessSource implements DriverIdlenessSource {
+
+ private static final IdleMessage IDLE_MESSAGE = new IdleMessage("EventHandlers", "All events have been processed.", true);
+ private static final IdleMessage NOT_IDLE_MESSAGE = new IdleMessage("EventHandlers", "Some events are still in flight.", true);
+
+ private final InjectionFuture<Evaluators> evaluators;
+ private final InjectionFuture<DriverIdleManager> driverIdleManager;
+
+ @Inject
+ EventHandlerIdlenessSource(final InjectionFuture<Evaluators> evaluators,
+ final InjectionFuture<DriverIdleManager> driverIdleManager) {
+ this.evaluators = evaluators;
+ this.driverIdleManager = driverIdleManager;
+ }
+
+
+ @Override
+ public IdleMessage getIdleStatus() {
+ if (this.evaluators.get().allEvaluatorsAreClosed()) {
+ return IDLE_MESSAGE;
+ } else {
+ return NOT_IDLE_MESSAGE;
+ }
+ }
+
+ public void check() {
+ if (this.evaluators.get().allEvaluatorsAreClosed()) {
+ this.driverIdleManager.get().onPotentiallyIdle(IDLE_MESSAGE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/IdleMessage.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/IdleMessage.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/IdleMessage.java
new file mode 100644
index 0000000..6210458
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/IdleMessage.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.idle;
+
+/**
+ * A message to be used to indicate that the driver is potentially idle.
+ */
+public final class IdleMessage {
+
+ private final String componentName;
+ private final String reason;
+ private final boolean isIdle;
+
+ /**
+ * @param componentName the name of the component that is indicating (not) idle, e.g. "Clock"
+ * @param reason the human-readable reason the component indicates (not) idle, "No more alarms scheduled."
+ * @param isIdle whether or not the component is idle.
+ */
+ public IdleMessage(final String componentName, final String reason, final boolean isIdle) {
+ this.componentName = componentName;
+ this.reason = reason;
+ this.isIdle = isIdle;
+ }
+
+ /**
+ * @return the name of the component that indicated (not) idle.
+ */
+ public String getComponentName() {
+ return this.componentName;
+ }
+
+ /**
+ * @return the reason for (no) idleness
+ */
+ public String getReason() {
+ return this.reason;
+ }
+
+ /**
+ * @return true if this message indicates idle.
+ */
+ public boolean isIdle() {
+ return this.isIdle;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/package-info.java
new file mode 100644
index 0000000..35dffa2
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Contains the Driver idle handling and extension APIs
+ */
+package org.apache.reef.runtime.common.driver.idle;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/package-info.java
new file mode 100644
index 0000000..520cfa1
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of the Driver-side REEF APIs.
+ */
+package org.apache.reef.runtime.common.driver;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorHandler.java
new file mode 100644
index 0000000..e5fa95e
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/NodeDescriptorHandler.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.catalog.ResourceCatalogImpl;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Updates the ResourceCatalog with a new Node.
+ */
+@Private
+@DriverSide
+public final class NodeDescriptorHandler implements EventHandler<DriverRuntimeProtocol.NodeDescriptorProto> {
+ private final ResourceCatalogImpl resourceCatalog;
+
+ @Inject
+ public NodeDescriptorHandler(ResourceCatalogImpl resourceCatalog) {
+ this.resourceCatalog = resourceCatalog;
+ }
+
+ @Override
+ public void onNext(final DriverRuntimeProtocol.NodeDescriptorProto value) {
+ this.resourceCatalog.handle(value);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationHandler.java
new file mode 100644
index 0000000..7d0e1e8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceAllocationHandler.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManagerFactory;
+import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Handles new resource allocations by adding a new EvaluatorManager.
+ */
+@Private
+@DriverSide
+public final class ResourceAllocationHandler
+ implements EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> {
+
+ /**
+ * Helper class to make new EvaluatorManager instances,
+ * given a Node they have been allocated on.
+ */
+ private final EvaluatorManagerFactory evaluatorManagerFactory;
+
+ /**
+ * The Evaluators known to the Driver.
+ */
+ private final Evaluators evaluators;
+
+ @Inject
+ ResourceAllocationHandler(
+ final EvaluatorManagerFactory evaluatorManagerFactory, final Evaluators evaluators) {
+ this.evaluatorManagerFactory = evaluatorManagerFactory;
+ this.evaluators = evaluators;
+ }
+
+ @Override
+ public void onNext(final DriverRuntimeProtocol.ResourceAllocationProto value) {
+ // FIXME: Using this put() method is a temporary fix for the race condition
+ // described in issues #828 and #839. Use Evaluators.put(EvaluatorManager) instead
+ // when the bug is fixed.
+ this.evaluators.put(this.evaluatorManagerFactory, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerErrorHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerErrorHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerErrorHandler.java
new file mode 100644
index 0000000..7f13de8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerErrorHandler.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * Informs the client and then shuts down the driver forcefully in case of Resource Manager errors.
+ */
+public final class ResourceManagerErrorHandler implements EventHandler<ReefServiceProtos.RuntimeErrorProto> {
+
+
+ private final DriverStatusManager driverStatusManager;
+
+ @Inject
+ ResourceManagerErrorHandler(final DriverStatusManager driverStatusManager) {
+ this.driverStatusManager = driverStatusManager;
+ }
+
+ @Override
+ public synchronized void onNext(final ReefServiceProtos.RuntimeErrorProto runtimeErrorProto) {
+ this.driverStatusManager.onError(new Exception("Resource Manager failure"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
new file mode 100644
index 0000000..c0db65d
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.DriverStatusManager;
+import org.apache.reef.runtime.common.driver.idle.DriverIdleManager;
+import org.apache.reef.runtime.common.driver.idle.DriverIdlenessSource;
+import org.apache.reef.runtime.common.driver.idle.IdleMessage;
+import org.apache.reef.tang.InjectionFuture;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Manages the status of the Resource Manager.
+ */
+@DriverSide
+@Private
+public final class ResourceManagerStatus implements EventHandler<DriverRuntimeProtocol.RuntimeStatusProto>,
+ DriverIdlenessSource {
+ private static final Logger LOG = Logger.getLogger(ResourceManagerStatus.class.getName());
+
+ private static final String COMPONENT_NAME = "ResourceManager";
+ private static final IdleMessage IDLE_MESSAGE = new IdleMessage(COMPONENT_NAME, "No outstanding requests or allocations", true);
+
+ private final ResourceManagerErrorHandler resourceManagerErrorHandler;
+ private final DriverStatusManager driverStatusManager;
+ private final InjectionFuture<DriverIdleManager> driverIdleManager;
+
+ // Mutable state.
+ private ReefServiceProtos.State state = ReefServiceProtos.State.INIT;
+ private int outstandingContainerRequests = 0;
+ private int containerAllocationCount = 0;
+
+ @Inject
+ ResourceManagerStatus(final ResourceManagerErrorHandler resourceManagerErrorHandler,
+ final DriverStatusManager driverStatusManager,
+ final InjectionFuture<DriverIdleManager> driverIdleManager) {
+ this.resourceManagerErrorHandler = resourceManagerErrorHandler;
+ this.driverStatusManager = driverStatusManager;
+ this.driverIdleManager = driverIdleManager;
+ }
+
+ @Override
+ public synchronized void onNext(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
+ final ReefServiceProtos.State newState = runtimeStatusProto.getState();
+ LOG.log(Level.FINEST, "Runtime status " + runtimeStatusProto);
+ this.outstandingContainerRequests = runtimeStatusProto.getOutstandingContainerRequests();
+ this.containerAllocationCount = runtimeStatusProto.getContainerAllocationCount();
+ this.setState(runtimeStatusProto.getState());
+
+ switch (newState) {
+ case FAILED:
+ this.onRMFailure(runtimeStatusProto);
+ break;
+ case DONE:
+ this.onRMDone(runtimeStatusProto);
+ break;
+ case RUNNING:
+ this.onRMRunning(runtimeStatusProto);
+ break;
+ }
+ }
+
+ /**
+ * Change the state of the Resource Manager to be RUNNING.
+ */
+ public synchronized void setRunning() {
+ this.setState(ReefServiceProtos.State.RUNNING);
+ }
+
+ /**
+ * @return idle, if there are no outstanding requests or allocations. Not idle else.
+ */
+ @Override
+ public synchronized IdleMessage getIdleStatus() {
+ if (this.isIdle()) {
+ return IDLE_MESSAGE;
+ } else {
+ final String message = new StringBuilder("There are ")
+ .append(this.outstandingContainerRequests)
+ .append(" outstanding container requests and ")
+ .append(this.containerAllocationCount)
+ .append(" allocated containers")
+ .toString();
+ return new IdleMessage(COMPONENT_NAME, message, false);
+ }
+ }
+
+
+ private synchronized void onRMFailure(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
+ assert (runtimeStatusProto.getState() == ReefServiceProtos.State.FAILED);
+ this.resourceManagerErrorHandler.onNext(runtimeStatusProto.getError());
+ }
+
+ private synchronized void onRMDone(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
+ assert (runtimeStatusProto.getState() == ReefServiceProtos.State.DONE);
+ LOG.log(Level.INFO, "Resource Manager shutdown happened. Triggering Driver shutdown.");
+ this.driverStatusManager.onComplete();
+ }
+
+ private synchronized void onRMRunning(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) {
+ assert (runtimeStatusProto.getState() == ReefServiceProtos.State.RUNNING);
+ if (this.isIdle()) {
+ this.driverIdleManager.get().onPotentiallyIdle(IDLE_MESSAGE);
+ }
+ }
+
+
+ private synchronized boolean isIdle() {
+ return this.hasNoOutstandingRequests()
+ && this.hasNoContainersAllocated();
+ }
+
+ private synchronized boolean isRunning() {
+ return ReefServiceProtos.State.RUNNING.equals(this.state);
+ }
+
+
+ private synchronized void setState(ReefServiceProtos.State state) {
+ // TODO: Add state transition check
+ this.state = state;
+ }
+
+
+ private synchronized boolean hasNoOutstandingRequests() {
+ return this.outstandingContainerRequests == 0;
+ }
+
+ private synchronized boolean hasNoContainersAllocated() {
+ return this.containerAllocationCount == 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
new file mode 100644
index 0000000..9abafbd
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceStatusHandler.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.DriverRuntimeProtocol;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManagerFactory;
+import org.apache.reef.runtime.common.driver.evaluator.Evaluators;
+import org.apache.reef.util.Optional;
+import org.apache.reef.wake.EventHandler;
+
+import javax.inject.Inject;
+
+/**
+ * A ResourceStatusProto message comes from the ResourceManager layer to indicate what it thinks
+ * about the current state of a given resource. Ideally, we should think the same thing.
+ */
+@Private
+public final class ResourceStatusHandler implements EventHandler<DriverRuntimeProtocol.ResourceStatusProto> {
+
+ private final Evaluators evaluators;
+ private final EvaluatorManagerFactory evaluatorManagerFactory;
+
+ @Inject
+ ResourceStatusHandler(final Evaluators evaluators, final EvaluatorManagerFactory evaluatorManagerFactory) {
+ this.evaluators = evaluators;
+ this.evaluatorManagerFactory = evaluatorManagerFactory;
+ }
+
+ /**
+ * This resource status message comes from the ResourceManager layer; telling me what it thinks
+ * about the state of the resource executing an Evaluator; This method simply passes the message
+ * off to the referenced EvaluatorManager
+ *
+ * @param resourceStatusProto resource status message from the ResourceManager
+ */
+ @Override
+ public void onNext(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) {
+ final Optional<EvaluatorManager> evaluatorManager = this.evaluators.get(resourceStatusProto.getIdentifier());
+ if (evaluatorManager.isPresent()) {
+ evaluatorManager.get().onResourceStatusMessage(resourceStatusProto);
+ } else {
+ if (resourceStatusProto.getIsFromPreviousDriver()) {
+ EvaluatorManager previousEvaluatorManager = this.evaluatorManagerFactory.createForEvaluatorFailedDuringDriverRestart(resourceStatusProto);
+ previousEvaluatorManager.onResourceStatusMessage(resourceStatusProto);
+ } else {
+ throw new RuntimeException(
+ "Unknown resource status from evaluator " + resourceStatusProto.getIdentifier() +
+ " with state " + resourceStatusProto.getState()
+ );
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/package-info.java
new file mode 100644
index 0000000..9ffc87d
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Classes that interface with the resourcemanager (Local, YARN, ...) in the Driver.
+ */
+package org.apache.reef.runtime.common.driver.resourcemanager;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/CompletedTaskImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/CompletedTaskImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/CompletedTaskImpl.java
new file mode 100644
index 0000000..5d1b0d0
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/CompletedTaskImpl.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.task;
+
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.CompletedTask;
+
+@Private
+@DriverSide
+public final class CompletedTaskImpl implements CompletedTask {
+
+ private final ActiveContext context;
+ private final byte[] message;
+ private final String id;
+
+ public CompletedTaskImpl(final ActiveContext context, final byte[] message, final String id) {
+ this.context = context;
+ this.message = message;
+ this.id = id;
+ }
+
+ @Override
+ public ActiveContext getActiveContext() {
+ return this.context;
+ }
+
+ @Override
+ public byte[] get() {
+ return this.message;
+ }
+
+ @Override
+ public String getId() {
+ return this.id;
+ }
+
+ @Override
+ public String toString() {
+ return "CompletedTask{ID='" + getId() + "'}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
new file mode 100644
index 0000000..4aa3092
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.task;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol.ContextControlProto;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol.StopTaskProto;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol.SuspendTaskProto;
+import org.apache.reef.runtime.common.driver.context.EvaluatorContext;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Implements the RunningTask client interface. It is mainly a helper class
+ * that will package up various client method calls into protocol buffers and
+ * pass them to its respective EvaluatorManager to deliver to the EvaluatorRuntime.
+ */
+@Private
+@DriverSide
+public final class RunningTaskImpl implements RunningTask {
+
+ private final static Logger LOG = Logger.getLogger(RunningTask.class.getName());
+
+ private final EvaluatorManager evaluatorManager;
+ private final EvaluatorContext evaluatorContext;
+ private final String taskId;
+ private final TaskRepresenter taskRepresenter;
+
+ public RunningTaskImpl(final EvaluatorManager evaluatorManager,
+ final String taskId,
+ final EvaluatorContext evaluatorContext,
+ final TaskRepresenter taskRepresenter) {
+ LOG.log(Level.FINEST, "INIT: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "]");
+
+ this.evaluatorManager = evaluatorManager;
+ this.evaluatorContext = evaluatorContext;
+ this.taskId = taskId;
+ this.taskRepresenter = taskRepresenter;
+ }
+
+
+ @Override
+ public ActiveContext getActiveContext() {
+ return this.evaluatorContext;
+ }
+
+ @Override
+ public String getId() {
+ return this.taskId;
+ }
+
+ @Override
+ public void send(final byte[] message) {
+ LOG.log(Level.FINEST, "MESSAGE: Task id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "]");
+
+ final ContextControlProto contextControlProto = ContextControlProto.newBuilder()
+ .setTaskMessage(ByteString.copyFrom(message))
+ .build();
+
+ this.evaluatorManager.sendContextControlMessage(contextControlProto);
+ }
+
+ @Override
+ public void close() {
+ LOG.log(Level.FINEST, "CLOSE: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "]");
+
+ if (this.taskRepresenter.isNotRunning()) {
+ LOG.log(Level.FINE, "Ignoring call to .close() because the task is no longer RUNNING.");
+ } else {
+ final ContextControlProto contextControlProto = ContextControlProto.newBuilder()
+ .setStopTask(StopTaskProto.newBuilder().build())
+ .build();
+ this.evaluatorManager.sendContextControlMessage(contextControlProto);
+ }
+ }
+
+ @Override
+ public void close(final byte[] message) {
+ LOG.log(Level.FINEST, "CLOSE: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "] with message.");
+ if (this.taskRepresenter.isNotRunning()) {
+ throw new RuntimeException("Trying to send a message to a Task that is no longer RUNNING.");
+ }
+
+ final ContextControlProto contextControlProto = ContextControlProto.newBuilder()
+ .setStopTask(StopTaskProto.newBuilder().build())
+ .setTaskMessage(ByteString.copyFrom(message))
+ .build();
+ this.evaluatorManager.sendContextControlMessage(contextControlProto);
+ }
+
+ @Override
+ public void suspend(final byte[] message) {
+ LOG.log(Level.FINEST, "SUSPEND: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "] with message.");
+
+ final ContextControlProto contextControlProto = ContextControlProto.newBuilder()
+ .setSuspendTask(SuspendTaskProto.newBuilder().build())
+ .setTaskMessage(ByteString.copyFrom(message))
+ .build();
+ this.evaluatorManager.sendContextControlMessage(contextControlProto);
+ }
+
+ @Override
+ public void suspend() {
+ LOG.log(Level.FINEST, "SUSPEND: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "]");
+
+ final ContextControlProto contextControlProto = ContextControlProto.newBuilder()
+ .setSuspendTask(SuspendTaskProto.newBuilder().build())
+ .build();
+ this.evaluatorManager.sendContextControlMessage(contextControlProto);
+ }
+
+ @Override
+ public String toString() {
+ return "RunningTask{taskId='" + taskId + "'}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/SuspendedTaskImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/SuspendedTaskImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/SuspendedTaskImpl.java
new file mode 100644
index 0000000..e750954
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/SuspendedTaskImpl.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.task;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.SuspendedTask;
+
+@Private
+@DriverSide
+public final class SuspendedTaskImpl implements SuspendedTask {
+ private final ActiveContext context;
+ private final byte[] message;
+ private final String id;
+
+ public SuspendedTaskImpl(final ActiveContext context, final byte[] message, final String id) {
+ this.context = context;
+ this.message = message;
+ this.id = id;
+ }
+
+ @Override
+ public ActiveContext getActiveContext() {
+ return this.context;
+ }
+
+ @Override
+ public byte[] get() {
+ return this.message;
+ }
+
+ @Override
+ public String getId() {
+ return this.id;
+ }
+
+ @Override
+ public String toString() {
+ return "SuspendedTask{ID='" + getId() + "'}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java
new file mode 100644
index 0000000..5a29b47
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskMessageImpl.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.task;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.task.TaskMessage;
+
+@Private
+@DriverSide
+public final class TaskMessageImpl implements TaskMessage {
+
+ private final byte[] theMessage;
+ private final String taskId;
+ private final String contextId;
+ private final String theMessageSourceId;
+
+ public TaskMessageImpl(final byte[] theMessage, final String taskId,
+ final String contextId, final String theMessageSourceId) {
+ this.theMessage = theMessage;
+ this.taskId = taskId;
+ this.contextId = contextId;
+ this.theMessageSourceId = theMessageSourceId;
+ }
+
+ @Override
+ public byte[] get() {
+ return this.theMessage;
+ }
+
+ @Override
+ public String getId() {
+ return this.taskId;
+ }
+
+ @Override
+ public String getContextId() {
+ return this.contextId;
+ }
+
+ @Override
+ public final String getMessageSourceID() {
+ return this.theMessageSourceId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
new file mode 100644
index 0000000..1c65197
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.driver.task;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.driver.task.FailedTask;
+import org.apache.reef.driver.task.RunningTask;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.context.EvaluatorContext;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
+import org.apache.reef.runtime.common.driver.evaluator.EvaluatorMessageDispatcher;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.util.Optional;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Represents a Task on the Driver.
+ */
+@DriverSide
+@Private
+public final class TaskRepresenter {
+
+ private static final Logger LOG = Logger.getLogger(TaskRepresenter.class.getName());
+
+ private final EvaluatorContext context;
+ private final EvaluatorMessageDispatcher messageDispatcher;
+ private final EvaluatorManager evaluatorManager;
+ private final ExceptionCodec exceptionCodec;
+ private final String taskId;
+
+ // Mutable state
+ private ReefServiceProtos.State state = ReefServiceProtos.State.INIT;
+
+ public TaskRepresenter(final String taskId,
+ final EvaluatorContext context,
+ final EvaluatorMessageDispatcher messageDispatcher,
+ final EvaluatorManager evaluatorManager,
+ final ExceptionCodec exceptionCodec) {
+ this.taskId = taskId;
+ this.context = context;
+ this.messageDispatcher = messageDispatcher;
+ this.evaluatorManager = evaluatorManager;
+ this.exceptionCodec = exceptionCodec;
+ }
+
+ private static byte[] getResult(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+ return taskStatusProto.hasResult() ? taskStatusProto.getResult().toByteArray() : null;
+ }
+
+ public void onTaskStatusMessage(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+
+ LOG.log(Level.FINE, "Received task {0} status {1}",
+ new Object[]{taskStatusProto.getTaskId(), taskStatusProto.getState()});
+
+ // Make sure that the message is indeed for us.
+ if (!taskStatusProto.getContextId().equals(this.context.getId())) {
+ throw new RuntimeException(
+ "Received a message for a task running on Context " + taskStatusProto.getContextId() +
+ " while the Driver believes this Task to be run on Context " + this.context.getId());
+ }
+
+ if (!taskStatusProto.getTaskId().equals(this.taskId)) {
+ throw new RuntimeException("Received a message for task " + taskStatusProto.getTaskId() +
+ " in the TaskRepresenter for Task " + this.taskId);
+ }
+ if (taskStatusProto.getRecovery()) {
+ // when a recovered heartbeat is received, we will take its word for it
+ LOG.log(Level.INFO, "Received task status {0} for RECOVERED task {1}.",
+ new Object[]{taskStatusProto.getState(), this.taskId});
+ this.setState(taskStatusProto.getState());
+ }
+ // Dispatch the message to the right method.
+ switch (taskStatusProto.getState()) {
+ case INIT:
+ this.onTaskInit(taskStatusProto);
+ break;
+ case RUNNING:
+ this.onTaskRunning(taskStatusProto);
+ break;
+ case SUSPEND:
+ this.onTaskSuspend(taskStatusProto);
+ break;
+ case DONE:
+ this.onTaskDone(taskStatusProto);
+ break;
+ case FAILED:
+ this.onTaskFailed(taskStatusProto);
+ break;
+ default:
+ throw new IllegalStateException("Unknown task state: " + taskStatusProto.getState());
+ }
+ }
+
+ private void onTaskInit(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+ assert ((ReefServiceProtos.State.INIT == taskStatusProto.getState()));
+ if (this.isKnown()) {
+ LOG.log(Level.WARNING, "Received a INIT message for task with id {0}" +
+ " which we have seen before. Ignoring the second message", this.taskId);
+ } else {
+ final RunningTask runningTask = new RunningTaskImpl(
+ this.evaluatorManager, this.taskId, this.context, this);
+ this.messageDispatcher.onTaskRunning(runningTask);
+ this.setState(ReefServiceProtos.State.RUNNING);
+ }
+ }
+
+ private void onTaskRunning(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+
+ assert (taskStatusProto.getState() == ReefServiceProtos.State.RUNNING);
+
+ if (this.isNotRunning()) {
+ throw new IllegalStateException("Received a task status message from task " + this.taskId +
+ " that is believed to be RUNNING on the Evaluator, but the Driver thinks it is in state " + this.state);
+ }
+
+ // fire driver restart task running handler if this is a recovery heartbeat
+ if (taskStatusProto.getRecovery()) {
+ final RunningTask runningTask = new RunningTaskImpl(
+ this.evaluatorManager, this.taskId, this.context, this);
+ this.messageDispatcher.onDriverRestartTaskRunning(runningTask);
+ }
+
+ for (final ReefServiceProtos.TaskStatusProto.TaskMessageProto taskMessageProto : taskStatusProto.getTaskMessageList()) {
+ this.messageDispatcher.onTaskMessage(
+ new TaskMessageImpl(taskMessageProto.getMessage().toByteArray(),
+ this.taskId, this.context.getId(), taskMessageProto.getSourceId()));
+ }
+ }
+
+ private void onTaskSuspend(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+ assert (ReefServiceProtos.State.SUSPEND == taskStatusProto.getState());
+ assert (this.isKnown());
+ this.messageDispatcher.onTaskSuspended(
+ new SuspendedTaskImpl(this.context, getResult(taskStatusProto), this.taskId));
+ this.setState(ReefServiceProtos.State.SUSPEND);
+ }
+
+ private void onTaskDone(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+ assert (ReefServiceProtos.State.DONE == taskStatusProto.getState());
+ assert (this.isKnown());
+ this.messageDispatcher.onTaskCompleted(
+ new CompletedTaskImpl(this.context, getResult(taskStatusProto), this.taskId));
+ this.setState(ReefServiceProtos.State.DONE);
+ }
+
+ private void onTaskFailed(final ReefServiceProtos.TaskStatusProto taskStatusProto) {
+ assert (ReefServiceProtos.State.FAILED == taskStatusProto.getState());
+ final Optional<ActiveContext> evaluatorContext = Optional.<ActiveContext>of(this.context);
+ final Optional<byte[]> bytes = Optional.ofNullable(getResult(taskStatusProto));
+ final Optional<Throwable> exception = this.exceptionCodec.fromBytes(bytes);
+ final String message = exception.isPresent() ? exception.get().getMessage() : "No message given";
+ final Optional<String> description = Optional.empty();
+ final FailedTask failedTask = new FailedTask(
+ this.taskId, message, description, exception, bytes, evaluatorContext);
+ this.messageDispatcher.onTaskFailed(failedTask);
+ this.setState(ReefServiceProtos.State.FAILED);
+ }
+
+ public String getId() {
+ return this.taskId;
+ }
+
+ /**
+ * @return true, if we had at least one message from the task.
+ */
+ private boolean isKnown() {
+ return this.state != ReefServiceProtos.State.INIT;
+ }
+
+ /**
+ * @return true, if this task is in any other state but RUNNING.
+ */
+ public boolean isNotRunning() {
+ return this.state != ReefServiceProtos.State.RUNNING;
+ }
+
+ private void setState(final ReefServiceProtos.State newState) {
+ LOG.log(Level.FINE, "Task [{0}] state transition from [{1}] to [{2}]",
+ new Object[]{this.taskId, this.state, newState});
+ this.state = newState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/package-info.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/package-info.java
new file mode 100644
index 0000000..ea55cca
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/package-info.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Driver-Side representations of tasks.
+ */
+@Private
+@DriverSide package org.apache.reef.runtime.common.driver.task;
+
+import org.apache.reef.annotations.audience.DriverSide;
+import org.apache.reef.annotations.audience.Private;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/DefaultDriverConnection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/DefaultDriverConnection.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/DefaultDriverConnection.java
new file mode 100644
index 0000000..61ca305
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/DefaultDriverConnection.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Default implementation for re-establishing connection to driver after driver restart.
+ * In this default implementation, information about re-started driver will be obtained
+ * by querying Http server.
+ */
+public final class DefaultDriverConnection implements DriverConnection {
+
+ private static final Logger LOG = Logger.getLogger(DefaultDriverConnection.class.getName());
+
+ @Inject
+ public DefaultDriverConnection() {
+ }
+
+ @Override
+ public String getDriverRemoteIdentifier() {
+ LOG.log(Level.FINE, "Trying to get driver remote identifier by querying Http server.");
+ // TODO: implement a proper mechanism to obtain driver remote identifier.
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/DriverConnection.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/DriverConnection.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/DriverConnection.java
new file mode 100644
index 0000000..a3ce3f3
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/DriverConnection.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator;
+
+/**
+ * Interface used for reconnecting to driver after driver restart
+ */
+public interface DriverConnection extends AutoCloseable {
+
+ /**
+ * @return driver remove identifier to facilitate reconnection to the (restarted) driver
+ */
+ String getDriverRemoteIdentifier();
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorConfiguration.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorConfiguration.java
new file mode 100644
index 0000000..43d7469
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorConfiguration.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator;
+
+import org.apache.reef.runtime.common.evaluator.parameters.*;
+import org.apache.reef.tang.ExternalConstructor;
+import org.apache.reef.tang.formats.ConfigurationModule;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
+import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.wake.time.Clock;
+
+import javax.inject.Inject;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * The runtime configuration of an evaluator.
+ */
+public final class EvaluatorConfiguration extends ConfigurationModuleBuilder {
+ public static final RequiredParameter<String> DRIVER_REMOTE_IDENTIFIER = new RequiredParameter<>();
+ public static final RequiredParameter<String> EVALUATOR_IDENTIFIER = new RequiredParameter<>();
+ public static final RequiredParameter<String> ROOT_CONTEXT_CONFIGURATION = new RequiredParameter<>();
+ public static final OptionalParameter<String> ROOT_SERVICE_CONFIGURATION = new OptionalParameter<>();
+ public static final OptionalParameter<String> TASK_CONFIGURATION = new OptionalParameter<>();
+ public static final OptionalParameter<Integer> HEARTBEAT_PERIOD = new OptionalParameter<>();
+ public static final OptionalParameter<String> APPLICATION_IDENTIFIER = new OptionalParameter<>();
+ public static final ConfigurationModule CONF = new EvaluatorConfiguration()
+ .bindSetEntry(Clock.RuntimeStartHandler.class, EvaluatorRuntime.RuntimeStartHandler.class)
+ .bindSetEntry(Clock.RuntimeStopHandler.class, EvaluatorRuntime.RuntimeStopHandler.class)
+ .bindConstructor(ExecutorService.class, ExecutorServiceConstructor.class)
+ .bindNamedParameter(DriverRemoteIdentifier.class, DRIVER_REMOTE_IDENTIFIER)
+ .bindNamedParameter(EvaluatorIdentifier.class, EVALUATOR_IDENTIFIER)
+ .bindNamedParameter(HeartbeatPeriod.class, HEARTBEAT_PERIOD)
+ .bindNamedParameter(RootContextConfiguration.class, ROOT_CONTEXT_CONFIGURATION)
+ .bindNamedParameter(InitialTaskConfiguration.class, TASK_CONFIGURATION)
+ .bindNamedParameter(RootServiceConfiguration.class, ROOT_SERVICE_CONFIGURATION)
+ .bindNamedParameter(ApplicationIdentifier.class, APPLICATION_IDENTIFIER)
+ .build();
+
+ private final static class ExecutorServiceConstructor implements ExternalConstructor<ExecutorService> {
+
+ @Inject
+ ExecutorServiceConstructor() {
+ }
+
+ @Override
+ public ExecutorService newInstance() {
+ return Executors.newCachedThreadPool();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/53ea32cc/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java
new file mode 100644
index 0000000..b239ee8
--- /dev/null
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorRuntime.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.evaluator;
+
+import com.google.protobuf.ByteString;
+import org.apache.reef.annotations.audience.EvaluatorSide;
+import org.apache.reef.proto.EvaluatorRuntimeProtocol.EvaluatorControlProto;
+import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.proto.ReefServiceProtos.EvaluatorStatusProto;
+import org.apache.reef.runtime.common.evaluator.context.ContextManager;
+import org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier;
+import org.apache.reef.runtime.common.evaluator.parameters.EvaluatorIdentifier;
+import org.apache.reef.runtime.common.evaluator.parameters.HeartbeatPeriod;
+import org.apache.reef.runtime.common.utils.ExceptionCodec;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.annotations.Unit;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Clock;
+import org.apache.reef.wake.time.runtime.event.RuntimeStart;
+import org.apache.reef.wake.time.runtime.event.RuntimeStop;
+
+import javax.inject.Inject;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+@Unit
+@EvaluatorSide
+final class EvaluatorRuntime implements EventHandler<EvaluatorControlProto> {
+
+ private final static Logger LOG = Logger.getLogger(EvaluatorRuntime.class.getName());
+
+ private final HeartBeatManager heartBeatManager;
+ private final ContextManager contextManager;
+ private final Clock clock;
+
+ private final String evaluatorIdentifier;
+ private final ExceptionCodec exceptionCodec;
+ private final AutoCloseable evaluatorControlChannel;
+
+ private ReefServiceProtos.State state = ReefServiceProtos.State.INIT;
+
+ @Inject
+ private EvaluatorRuntime(
+ final @Parameter(HeartbeatPeriod.class) int heartbeatPeriod,
+ final @Parameter(EvaluatorIdentifier.class) String evaluatorIdentifier,
+ final @Parameter(DriverRemoteIdentifier.class) String driverRID,
+ final HeartBeatManager.HeartbeatAlarmHandler heartbeatAlarmHandler,
+ final HeartBeatManager heartBeatManager,
+ final Clock clock,
+ final ContextManager contextManagerFuture,
+ final RemoteManager remoteManager,
+ final ExceptionCodec exceptionCodec) {
+
+ this.heartBeatManager = heartBeatManager;
+ this.contextManager = contextManagerFuture;
+ this.clock = clock;
+
+ this.evaluatorIdentifier = evaluatorIdentifier;
+ this.exceptionCodec = exceptionCodec;
+ this.evaluatorControlChannel =
+ remoteManager.registerHandler(driverRID, EvaluatorControlProto.class, this);
+
+ // start the heartbeats
+ clock.scheduleAlarm(heartbeatPeriod, heartbeatAlarmHandler);
+ }
+
+ private void onEvaluatorControlMessage(final EvaluatorControlProto message) {
+
+ synchronized (this.heartBeatManager) {
+
+ LOG.log(Level.FINEST, "Evaluator control message");
+
+ if (!message.getIdentifier().equals(this.evaluatorIdentifier)) {
+ this.onException(new RuntimeException(
+ "Identifier mismatch: message for evaluator id[" + message.getIdentifier()
+ + "] sent to evaluator id[" + this.evaluatorIdentifier + "]"
+ ));
+ } else if (ReefServiceProtos.State.RUNNING != this.state) {
+ this.onException(new RuntimeException(
+ "Evaluator sent a control message but its state is not "
+ + ReefServiceProtos.State.RUNNING + " but rather " + this.state
+ ));
+ } else {
+
+ if (message.hasContextControl()) {
+
+ LOG.log(Level.FINEST, "Send task control message to ContextManager");
+
+ try {
+ this.contextManager.handleContextControlProtocol(message.getContextControl());
+ if (this.contextManager.contextStackIsEmpty() && this.state == ReefServiceProtos.State.RUNNING) {
+ this.state = ReefServiceProtos.State.DONE;
+ this.heartBeatManager.sendEvaluatorStatus(this.getEvaluatorStatus());
+ this.clock.close();
+ }
+ } catch (final Throwable e) {
+ this.onException(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ if (message.hasKillEvaluator()) {
+ LOG.log(Level.SEVERE, "Evaluator {0} has been killed by the driver.", this.evaluatorIdentifier);
+ this.state = ReefServiceProtos.State.KILLED;
+ this.clock.close();
+ }
+ }
+ }
+ }
+
+ private final void onException(final Throwable exception) {
+ synchronized (this.heartBeatManager) {
+ this.state = ReefServiceProtos.State.FAILED;
+
+ final EvaluatorStatusProto evaluatorStatusProto = EvaluatorStatusProto.newBuilder()
+ .setEvaluatorId(this.evaluatorIdentifier)
+ .setError(ByteString.copyFrom(this.exceptionCodec.toBytes(exception)))
+ .setState(this.state)
+ .build();
+ this.heartBeatManager.sendEvaluatorStatus(evaluatorStatusProto);
+ this.contextManager.close();
+ }
+ }
+
+ public EvaluatorStatusProto getEvaluatorStatus() {
+ synchronized (this.heartBeatManager) {
+ LOG.log(Level.FINEST, "Evaluator heartbeat: state = {0}", this.state);
+ final EvaluatorStatusProto.Builder evaluatorStatus =
+ EvaluatorStatusProto.newBuilder()
+ .setEvaluatorId(this.evaluatorIdentifier)
+ .setState(this.state);
+ return evaluatorStatus.build();
+ }
+ }
+
+ final ReefServiceProtos.State getState() {
+ return this.state;
+ }
+
+ boolean isRunning() {
+ return this.state == ReefServiceProtos.State.RUNNING;
+ }
+
+ @Override
+ public void onNext(EvaluatorControlProto evaluatorControlProto) {
+ this.onEvaluatorControlMessage(evaluatorControlProto);
+ }
+
+ final class RuntimeStartHandler implements EventHandler<RuntimeStart> {
+
+ @Override
+ public final void onNext(final RuntimeStart runtimeStart) {
+ synchronized (EvaluatorRuntime.this.heartBeatManager) {
+ try {
+ LOG.log(Level.FINEST, "runtime start");
+ assert (ReefServiceProtos.State.INIT == EvaluatorRuntime.this.state);
+ EvaluatorRuntime.this.state = ReefServiceProtos.State.RUNNING;
+ EvaluatorRuntime.this.contextManager.start();
+ EvaluatorRuntime.this.heartBeatManager.sendHeartbeat();
+ } catch (final Throwable e) {
+ EvaluatorRuntime.this.onException(e);
+ }
+ }
+ }
+ }
+
+ final class RuntimeStopHandler implements EventHandler<RuntimeStop> {
+
+ @Override
+ public final void onNext(final RuntimeStop runtimeStop) {
+ synchronized (EvaluatorRuntime.this.heartBeatManager) {
+ LOG.log(Level.FINEST, "EvaluatorRuntime shutdown invoked for Evaluator {0} in state {1}",
+ new Object[]{evaluatorIdentifier, state});
+
+ if (EvaluatorRuntime.this.isRunning()) {
+ EvaluatorRuntime.this.onException(new RuntimeException(
+ "RuntimeStopHandler invoked in state RUNNING.", runtimeStop.getException()));
+ } else {
+ EvaluatorRuntime.this.contextManager.close();
+ try {
+ EvaluatorRuntime.this.evaluatorControlChannel.close();
+ } catch (final Exception e) {
+ LOG.log(Level.SEVERE, "Exception during shutdown of evaluatorControlChannel.", e);
+ }
+ LOG.log(Level.FINEST, "EvaluatorRuntime shutdown complete");
+ }
+ }
+ }
+ }
+}