You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ma...@apache.org on 2016/09/16 22:33:40 UTC
reef git commit: [REEF-1557] Refactor State, EvaluatorState,
and related clases.
Repository: reef
Updated Branches:
refs/heads/master a9d6d6a66 -> 719e64459
[REEF-1557] Refactor State, EvaluatorState, and related clases.
* Move all state-checking logic into corresponding enums.
* Make state atomic in EvaluatorStatusManager.
* Add javadocs and cleanup the code.
* Refactor EvaluatorManager for readability.
JIRA:
[REEF-1557](https://issues.apache.org/jira/browse/REEF-1557)
Pull request:
This closes #1118
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/719e6445
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/719e6445
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/719e6445
Branch: refs/heads/master
Commit: 719e6445951b0214c1b687ac85def666e5bc2c4a
Parents: a9d6d6a
Author: Sergiy Matusevych <mo...@apache.org>
Authored: Fri Sep 9 11:43:02 2016 -0700
Committer: Mariia Mykhailova <ma...@apache.org>
Committed: Fri Sep 16 15:31:13 2016 -0700
----------------------------------------------------------------------
.../driver/restart/EvaluatorRestartInfo.java | 23 +-
.../driver/restart/EvaluatorRestartState.java | 37 ++-
.../runtime/common/driver/DriverStatus.java | 54 ++++-
.../common/driver/DriverStatusManager.java | 77 +++---
.../driver/evaluator/EvaluatorManager.java | 240 +++++++++++--------
.../common/driver/evaluator/EvaluatorState.java | 156 +++++++++++-
.../evaluator/EvaluatorStatusManager.java | 218 ++++++++---------
.../evaluator/pojos/EvaluatorStatusPOJO.java | 48 +---
.../common/driver/evaluator/pojos/State.java | 121 +++++++++-
.../driver/evaluator/pojos/TaskStatusPOJO.java | 70 ++----
.../common/driver/idle/DriverIdleManager.java | 2 +-
.../resourcemanager/ResourceManagerStatus.java | 72 +-----
12 files changed, 676 insertions(+), 442 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
index 8f35af5..27baa61 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartInfo.java
@@ -33,7 +33,9 @@ import org.apache.reef.runtime.common.driver.resourcemanager.ResourceRecoverEven
@DriverSide
@Unstable
public final class EvaluatorRestartInfo {
+
private final ResourceRecoverEvent resourceRecoverEvent;
+
private EvaluatorRestartState evaluatorRestartState;
/**
@@ -44,11 +46,19 @@ public final class EvaluatorRestartInfo {
return new EvaluatorRestartInfo(resourceRecoverEvent, EvaluatorRestartState.EXPECTED);
}
+ private EvaluatorRestartInfo(
+ final ResourceRecoverEvent resourceRecoverEvent, final EvaluatorRestartState evaluatorRestartState) {
+
+ this.resourceRecoverEvent = resourceRecoverEvent;
+ this.evaluatorRestartState = evaluatorRestartState;
+ }
+
/**
* Creates an {@link EvaluatorRestartInfo} object that represents the information of an evaluator that
* has failed on driver restart.
*/
public static EvaluatorRestartInfo createFailedEvaluatorInfo(final String evaluatorId) {
+
final ResourceRecoverEvent resourceRecoverEvent =
ResourceEventImpl.newRecoveryBuilder().setIdentifier(evaluatorId).build();
@@ -61,31 +71,26 @@ public final class EvaluatorRestartInfo {
* recovered evaluator on restart.
*/
public ResourceRecoverEvent getResourceRecoverEvent() {
- return resourceRecoverEvent;
+ return this.resourceRecoverEvent;
}
/**
* @return the current process of the restart.
*/
public EvaluatorRestartState getEvaluatorRestartState() {
- return evaluatorRestartState;
+ return this.evaluatorRestartState;
}
/**
* sets the current process of the restart.
*/
public boolean setEvaluatorRestartState(final EvaluatorRestartState to) {
- if (EvaluatorRestartState.isLegalTransition(evaluatorRestartState, to)) {
+
+ if (this.evaluatorRestartState.isLegalTransition(to)) {
this.evaluatorRestartState = to;
return true;
}
return false;
}
-
- private EvaluatorRestartInfo(final ResourceRecoverEvent resourceRecoverEvent,
- final EvaluatorRestartState evaluatorRestartState) {
- this.resourceRecoverEvent = resourceRecoverEvent;
- this.evaluatorRestartState = evaluatorRestartState;
- }
}
http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
index a9b2d94..c48c494 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/restart/EvaluatorRestartState.java
@@ -29,6 +29,7 @@ import org.apache.reef.annotations.audience.Private;
@DriverSide
@Unstable
public enum EvaluatorRestartState {
+
/**
* The evaluator is not a restarted instance. Not expecting.
*/
@@ -65,32 +66,51 @@ public enum EvaluatorRestartState {
FAILED;
/**
+ * Check if the transition of {@link EvaluatorRestartState} from one state to another is legal.
+ * @param fromState start state.
+ * @param toState destination state.
* @return true if the transition of {@link EvaluatorRestartState} is legal.
+ * @deprecated TODO[JIRA REEF-1560] Use non-static method instead. Remove after version 0.16
*/
- public static boolean isLegalTransition(final EvaluatorRestartState from, final EvaluatorRestartState to) {
- switch(from) {
+ @Deprecated
+ public static boolean isLegalTransition(
+ final EvaluatorRestartState fromState, final EvaluatorRestartState toState) {
+ return fromState.isLegalTransition(toState);
+ }
+
+ /**
+ * Check if the transition of {@link EvaluatorRestartState} from current state to the given one is legal.
+ * @param toState destination state.
+ * @return true if the transition is legal, false otherwise.
+ */
+ public final boolean isLegalTransition(final EvaluatorRestartState toState) {
+
+ switch(this) {
case EXPECTED:
- switch(to) {
+ switch(toState) {
case EXPIRED:
case REPORTED:
return true;
default:
return false;
}
+
case REPORTED:
- switch(to) {
+ switch(toState) {
case REREGISTERED:
return true;
default:
return false;
}
+
case REREGISTERED:
- switch(to) {
+ switch(toState) {
case PROCESSED:
return true;
default:
return false;
}
+
default:
return false;
}
@@ -135,4 +155,11 @@ public enum EvaluatorRestartState {
return false;
}
}
+
+ /**
+ * @return true if the evaluator has had its recovery heartbeat processed.
+ */
+ public boolean isReregistered() {
+ return this == REREGISTERED;
+ }
}
http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java
index 1999329..922137f 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatus.java
@@ -22,9 +22,61 @@ package org.apache.reef.runtime.common.driver;
* The status of the Driver.
*/
public enum DriverStatus {
+
PRE_INIT,
INIT,
RUNNING,
SHUTTING_DOWN,
- FAILING
+ FAILING;
+
+ /**
+ * Check if the driver is in process of shutting down (either gracefully or due to an error).
+ * @return true if the driver is shutting down (gracefully or otherwise).
+ */
+ public boolean isClosing() {
+ return this == SHUTTING_DOWN || this == FAILING;
+ }
+
+ /**
+ * Check whether a driver state transition from current state to a given one is legal.
+ * @param toStatus Destination state.
+ * @return true if transition is valid, false otherwise.
+ */
+ public boolean isLegalTransition(final DriverStatus toStatus) {
+
+ switch (this) {
+
+ case PRE_INIT:
+ switch (toStatus) {
+ case INIT:
+ return true;
+ default:
+ return false;
+ }
+
+ case INIT:
+ switch (toStatus) {
+ case RUNNING:
+ return true;
+ default:
+ return false;
+ }
+
+ case RUNNING:
+ switch (toStatus) {
+ case SHUTTING_DOWN:
+ case FAILING:
+ return true;
+ default:
+ return false;
+ }
+
+ case FAILING:
+ case SHUTTING_DOWN:
+ return false;
+
+ default:
+ throw new IllegalStateException("Unknown input state: " + this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
index 129e702..6084431 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/DriverStatusManager.java
@@ -76,44 +76,6 @@ public final class DriverStatusManager {
}
/**
- * Check whether a state transition 'from->to' is legal.
- * @param from Source state.
- * @param to Destination state.
- * @return true if transition is valid, false otherwise.
- */
- private static boolean isLegalTransition(final DriverStatus from, final DriverStatus to) {
- switch (from) {
- case PRE_INIT:
- switch (to) {
- case INIT:
- return true;
- default:
- return false;
- }
- case INIT:
- switch (to) {
- case RUNNING:
- return true;
- default:
- return false;
- }
- case RUNNING:
- switch (to) {
- case SHUTTING_DOWN:
- case FAILING:
- return true;
- default:
- return false;
- }
- case FAILING:
- case SHUTTING_DOWN:
- return false;
- default:
- throw new IllegalStateException("Unknown input state: " + from);
- }
- }
-
- /**
* Changes the driver status to INIT and sends message to the client about the transition.
*/
public synchronized void onInit() {
@@ -134,7 +96,7 @@ public final class DriverStatusManager {
LOG.entering(CLASS_NAME, "onRunning");
- if (this.driverStatus.equals(DriverStatus.PRE_INIT)) {
+ if (this.driverStatus == DriverStatus.PRE_INIT) {
this.onInit();
}
@@ -152,7 +114,7 @@ public final class DriverStatusManager {
LOG.entering(CLASS_NAME, "onError", exception);
- if (this.isShuttingDownOrFailing()) {
+ if (this.isClosing()) {
LOG.log(Level.WARNING, "Received an exception while already in shutdown.", exception);
} else {
LOG.log(Level.WARNING, "Shutting down the Driver with an exception: ", exception);
@@ -171,15 +133,18 @@ public final class DriverStatusManager {
LOG.entering(CLASS_NAME, "onComplete");
- if (this.isShuttingDownOrFailing()) {
+ if (this.isClosing()) {
LOG.log(Level.WARNING, "Ignoring second call to onComplete()",
new Exception("Dummy exception to get the call stack"));
} else {
+
LOG.log(Level.INFO, "Clean shutdown of the Driver.");
+
if (LOG.isLoggable(Level.FINEST)) {
LOG.log(Level.FINEST, "Call stack: ",
new Exception("Dummy exception to get the call stack"));
}
+
this.clock.close();
this.setStatus(DriverStatus.SHUTTING_DOWN);
}
@@ -201,9 +166,10 @@ public final class DriverStatusManager {
* @deprecated TODO[JIRA REEF-1548] Do not use DriverStatusManager as a proxy to the job client.
* After release 0.16, make this method private and use it inside onRuntimeStop() method instead.
*/
+ @Deprecated
public synchronized void sendJobEndingMessageToClient(final Optional<Throwable> exception) {
- if (!this.isShuttingDownOrFailing()) {
+ if (!this.isClosing()) {
LOG.log(Level.SEVERE, "Sending message in a state different that SHUTTING_DOWN or FAILING. " +
"This is likely a illegal call to clock.close() at play. Current state: {0}", this.driverStatus);
}
@@ -234,21 +200,34 @@ public final class DriverStatusManager {
this.driverTerminationHasBeenCommunicatedToClient = true;
}
+ /**
+ * Check if the driver is in process of shutting down (either gracefully or due to an error).
+ * @return true if the driver is shutting down (gracefully or otherwise).
+ * @deprecated TODO[JIRA REEF-1560] Use isClosing() method instead. Remove after version 0.16
+ */
+ @Deprecated
public synchronized boolean isShuttingDownOrFailing() {
- return DriverStatus.SHUTTING_DOWN.equals(this.driverStatus)
- || DriverStatus.FAILING.equals(this.driverStatus);
+ return this.isClosing();
+ }
+
+ /**
+ * Check if the driver is in process of shutting down (either gracefully or due to an error).
+ * @return true if the driver is shutting down (gracefully or otherwise).
+ */
+ public synchronized boolean isClosing() {
+ return this.driverStatus.isClosing();
}
/**
* Helper method to set the status.
* This also checks whether the transition from the current status to the new one is legal.
- * @param newStatus Driver status to transition to.
+ * @param toStatus Driver status to transition to.
*/
- private synchronized void setStatus(final DriverStatus newStatus) {
- if (isLegalTransition(this.driverStatus, newStatus)) {
- this.driverStatus = newStatus;
+ private synchronized void setStatus(final DriverStatus toStatus) {
+ if (this.driverStatus.isLegalTransition(toStatus)) {
+ this.driverStatus = toStatus;
} else {
- LOG.log(Level.WARNING, "Illegal state transition: {0} -> {1}", new Object[] {this.driverStatus, newStatus});
+ LOG.log(Level.WARNING, "Illegal state transition: {0} -> {1}", new Object[] {this.driverStatus, toStatus});
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index d4b8997..fc77380 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -25,6 +25,7 @@ import org.apache.reef.driver.parameters.EvaluatorConfigurationProviders;
import org.apache.reef.driver.restart.DriverRestartManager;
import org.apache.reef.driver.restart.EvaluatorRestartState;
import org.apache.reef.exception.NonSerializableException;
+import org.apache.reef.runtime.common.driver.api.*;
import org.apache.reef.runtime.common.driver.evaluator.pojos.ContextStatusPOJO;
import org.apache.reef.runtime.common.driver.evaluator.pojos.EvaluatorStatusPOJO;
import org.apache.reef.runtime.common.driver.evaluator.pojos.State;
@@ -41,10 +42,6 @@ import org.apache.reef.io.naming.Identifiable;
import org.apache.reef.proto.EvaluatorRuntimeProtocol;
import org.apache.reef.proto.ReefServiceProtos;
import org.apache.reef.driver.evaluator.EvaluatorProcess;
-import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent;
-import org.apache.reef.runtime.common.driver.api.ResourceReleaseEventImpl;
-import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler;
-import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler;
import org.apache.reef.runtime.common.driver.context.ContextControlHandler;
import org.apache.reef.runtime.common.driver.context.ContextRepresenters;
import org.apache.reef.runtime.common.driver.idle.EventHandlerIdlenessSource;
@@ -111,17 +108,19 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
// Mutable fields
private Optional<TaskRepresenter> task = Optional.empty();
- private boolean isResourceReleased = false;
- private boolean allocationFired = false;
+ private boolean resourceNotReleased = true;
+ private boolean allocationNotFired = true;
@Inject
private EvaluatorManager(
+ @Parameter(EvaluatorIdentifier.class) final String evaluatorId,
+ @Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl evaluatorDescriptor,
+ @Parameter(EvaluatorConfigurationProviders.class)
+ final Set<ConfigurationProvider> evaluatorConfigurationProviders,
final Clock clock,
final RemoteManager remoteManager,
final ResourceReleaseHandler resourceReleaseHandler,
final ResourceLaunchHandler resourceLaunchHandler,
- @Parameter(EvaluatorIdentifier.class) final String evaluatorId,
- @Parameter(EvaluatorDescriptorName.class) final EvaluatorDescriptorImpl evaluatorDescriptor,
final ContextRepresenters contextRepresenters,
final ConfigurationSerializer configurationSerializer,
final EvaluatorMessageDispatcher messageDispatcher,
@@ -131,18 +130,20 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
final ExceptionCodec exceptionCodec,
final EventHandlerIdlenessSource idlenessSource,
final LoggingScopeFactory loggingScopeFactory,
- @Parameter(EvaluatorConfigurationProviders.class)
- final Set<ConfigurationProvider> evaluatorConfigurationProviders,
final DriverRestartManager driverRestartManager,
final EvaluatorIdlenessThreadPool idlenessThreadPool) {
- this.contextRepresenters = contextRepresenters;
- this.idlenessSource = idlenessSource;
+
LOG.log(Level.FINEST, "Instantiating 'EvaluatorManager' for evaluator: {0}", evaluatorId);
+
+ this.evaluatorId = evaluatorId;
+ this.evaluatorDescriptor = evaluatorDescriptor;
+ this.evaluatorConfigurationProviders = evaluatorConfigurationProviders;
+
this.clock = clock;
+ this.contextRepresenters = contextRepresenters;
+ this.idlenessSource = idlenessSource;
this.resourceReleaseHandler = resourceReleaseHandler;
this.resourceLaunchHandler = resourceLaunchHandler;
- this.evaluatorId = evaluatorId;
- this.evaluatorDescriptor = evaluatorDescriptor;
this.messageDispatcher = messageDispatcher;
this.evaluatorControlHandler = evaluatorControlHandler;
@@ -153,7 +154,6 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
this.remoteManager = remoteManager;
this.configurationSerializer = configurationSerializer;
this.loggingScopeFactory = loggingScopeFactory;
- this.evaluatorConfigurationProviders = evaluatorConfigurationProviders;
this.driverRestartManager = driverRestartManager;
this.idlenessThreadPool = idlenessThreadPool;
@@ -182,28 +182,27 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
* Fires the EvaluatorAllocatedEvent to the handlers. Can only be done once.
*/
public synchronized void fireEvaluatorAllocatedEvent() {
- if (!allocationFired && stateManager.isAllocated()) {
+
+ if (this.stateManager.isAllocated() && this.allocationNotFired) {
+
final AllocatedEvaluator allocatedEvaluator =
new AllocatedEvaluatorImpl(this,
- remoteManager.getMyIdentifier(),
- configurationSerializer,
+ this.remoteManager.getMyIdentifier(),
+ this.configurationSerializer,
getJobIdentifier(),
- loggingScopeFactory,
- evaluatorConfigurationProviders);
- LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", evaluatorId);
- messageDispatcher.onEvaluatorAllocated(allocatedEvaluator);
- allocationFired = true;
+ this.loggingScopeFactory,
+ this.evaluatorConfigurationProviders);
+
+ LOG.log(Level.FINEST, "Firing AllocatedEvaluator event for Evaluator with ID [{0}]", this.evaluatorId);
+
+ this.messageDispatcher.onEvaluatorAllocated(allocatedEvaluator);
+ this.allocationNotFired = false;
+
} else {
- LOG.log(Level.WARNING, "Evaluator allocated event fired more than once.");
+ LOG.log(Level.WARNING, "AllocatedEvaluator event fired more than once.");
}
}
- private static boolean isDoneOrFailedOrKilled(final ResourceStatusEvent resourceStatusEvent) {
- return resourceStatusEvent.getState() == State.DONE ||
- resourceStatusEvent.getState() == State.FAILED ||
- resourceStatusEvent.getState() == State.KILLED;
- }
-
@Override
public String getId() {
return this.evaluatorId;
@@ -219,58 +218,63 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
@Override
public void close() {
+
synchronized (this.evaluatorDescriptor) {
- if (this.stateManager.isAllocatedOrSubmittedOrRunning()) {
+
+ if (this.stateManager.isAvailable()) {
+
LOG.log(Level.WARNING, "Dirty shutdown of running evaluator id[{0}]", getId());
+
try {
- if (this.stateManager.isRunning()){
+
+ if (this.stateManager.isRunning()) {
+
// Killing the evaluator means that it doesn't need to send a confirmation; it just dies.
- final EvaluatorRuntimeProtocol.EvaluatorControlProto evaluatorControlProto =
+ this.sendEvaluatorControlMessage(
EvaluatorRuntimeProtocol.EvaluatorControlProto.newBuilder()
.setTimestamp(System.currentTimeMillis())
.setIdentifier(getId())
.setKillEvaluator(EvaluatorRuntimeProtocol.KillEvaluatorProto.newBuilder().build())
- .build();
- sendEvaluatorControlMessage(evaluatorControlProto);
+ .build());
+
this.stateManager.setClosing();
+
} else {
this.stateManager.setKilled();
}
+
} catch (Exception e) {
LOG.log(Level.WARNING, "Exception occurred when manager sends killing message to task.", e);
this.stateManager.setKilled();
}
}
- if (!this.isResourceReleased) {
- this.isResourceReleased = true;
+ if (this.resourceNotReleased) {
+
+ this.resourceNotReleased = false;
+
+ final ResourceReleaseEvent releaseEvent = ResourceReleaseEventImpl.newBuilder()
+ .setIdentifier(this.evaluatorId)
+ .setRuntimeName(this.getEvaluatorDescriptor().getRuntimeName())
+ .build();
+
try {
- /* We need to wait awhile before returning the container to the RM in order to
- * give the EvaluatorRuntime (and Launcher) time to cleanly exit. */
+ // We need to wait awhile before returning the container to the RM
+ // in order to give the EvaluatorRuntime (and Launcher) time to cleanly exit.
this.clock.scheduleAlarm(100, new EventHandler<Alarm>() {
@Override
public void onNext(final Alarm alarm) {
- EvaluatorManager.this.resourceReleaseHandler.onNext(
- ResourceReleaseEventImpl.newBuilder()
- .setIdentifier(EvaluatorManager.this.evaluatorId)
- .setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName())
- .build()
- );
+ resourceReleaseHandler.onNext(releaseEvent);
}
});
} catch (final IllegalStateException e) {
LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e);
- EvaluatorManager.this.resourceReleaseHandler.onNext(
- ResourceReleaseEventImpl.newBuilder()
- .setIdentifier(EvaluatorManager.this.evaluatorId)
- .setRuntimeName(EvaluatorManager.this.getEvaluatorDescriptor().getRuntimeName())
- .build()
- );
+ this.resourceReleaseHandler.onNext(releaseEvent);
}
}
}
- idlenessThreadPool.runCheckAsync(this);
+ this.idlenessThreadPool.runCheckAsync(this);
}
/**
@@ -278,8 +282,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
* <em>and</em> there are no messages queued or in processing.
*/
public boolean isClosed() {
- return this.messageDispatcher.isEmpty() &&
- this.stateManager.isDoneOrFailedOrKilled();
+ return this.messageDispatcher.isEmpty() && this.stateManager.isCompleted();
}
/**
@@ -310,9 +313,11 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
*/
public void onEvaluatorException(final EvaluatorException exception) {
synchronized (this.evaluatorDescriptor) {
- if (this.stateManager.isDoneOrFailedOrKilled()) {
- LOG.log(Level.FINE, "Ignoring an exception received for Evaluator {0} which is already in state {1}.",
- new Object[]{this.getId(), this.stateManager});
+
+ if (this.stateManager.isCompleted()) {
+ LOG.log(Level.FINE,
+ "Ignoring an exception received for Evaluator {0} which is already in state {1}.",
+ new Object[] {this.getId(), this.stateManager});
return;
}
@@ -324,6 +329,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
final Optional<FailedTask> failedTaskOptional;
if (this.task.isPresent()) {
+
final String taskId = this.task.get().getId();
final Optional<ActiveContext> evaluatorContext = Optional.empty();
final Optional<byte[]> bytes = Optional.empty();
@@ -332,13 +338,15 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
final Optional<String> description = Optional.empty();
final FailedTask failedTask =
new FailedTask(taskId, message, description, taskException, bytes, evaluatorContext);
+
failedTaskOptional = Optional.of(failedTask);
+
} else {
failedTaskOptional = Optional.empty();
}
- final FailedEvaluator failedEvaluator = new FailedEvaluatorImpl(exception, failedContextList,
- failedTaskOptional, this.evaluatorId);
+ final FailedEvaluator failedEvaluator = new FailedEvaluatorImpl(
+ exception, failedContextList, failedTaskOptional, this.evaluatorId);
if (driverRestartManager.getEvaluatorRestartState(evaluatorId).isFailedOrExpired()) {
this.messageDispatcher.onDriverRestartEvaluatorFailed(failedEvaluator);
@@ -362,18 +370,26 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto evaluatorHeartbeatProto =
evaluatorHeartbeatProtoRemoteMessage.getMessage();
+
LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto);
synchronized (this.evaluatorDescriptor) {
- if (this.stateManager.isDoneOrFailedOrKilled()) {
- LOG.log(Level.FINE, "Ignoring a heartbeat received for Evaluator {0} which is already in state {1}.",
- new Object[]{this.getId(), this.stateManager});
+
+ if (this.stateManager.isCompleted()) {
+
+ LOG.log(Level.FINE,
+ "Ignoring a heartbeat received for Evaluator {0} which is already in state {1}.",
+ new Object[] {this.getId(), this.stateManager});
+
return;
- } else if (this.stateManager.isAllocatedOrSubmittedOrRunning()) {
- this.sanityChecker.check(evaluatorId, evaluatorHeartbeatProto.getTimestamp());
+
+ } else if (this.stateManager.isAvailable()) {
+
+ this.sanityChecker.check(this.evaluatorId, evaluatorHeartbeatProto.getTimestamp());
final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
- final EvaluatorRestartState evaluatorRestartState = driverRestartManager.getEvaluatorRestartState(evaluatorId);
+ final EvaluatorRestartState evaluatorRestartState =
+ this.driverRestartManager.getEvaluatorRestartState(this.evaluatorId);
/*
* First message from a running evaluator. The evaluator can be a new evaluator or be a previous evaluator
@@ -396,7 +412,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
if (evaluatorRestartState == EvaluatorRestartState.REPORTED) {
- driverRestartManager.setEvaluatorReregistered(evaluatorId);
+ this.driverRestartManager.setEvaluatorReregistered(this.evaluatorId);
}
}
}
@@ -406,8 +422,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
// Process the Evaluator status message
if (evaluatorHeartbeatProto.hasEvaluatorStatus()) {
- EvaluatorStatusPOJO evaluatorStatus = new EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus());
- this.onEvaluatorStatusMessage(evaluatorStatus);
+ this.onEvaluatorStatusMessage(new EvaluatorStatusPOJO(evaluatorHeartbeatProto.getEvaluatorStatus()));
}
// Process the Context status message(s)
@@ -417,14 +432,13 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
contextStatusList.add(new ContextStatusPOJO(proto, messageSequenceNumber));
}
- this.contextRepresenters.onContextStatusMessages(contextStatusList,
- informClientOfNewContexts);
+ this.contextRepresenters.onContextStatusMessages(contextStatusList, informClientOfNewContexts);
// Process the Task status message
if (evaluatorHeartbeatProto.hasTaskStatus()) {
- TaskStatusPOJO taskStatus = new TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus(), messageSequenceNumber);
- this.onTaskStatusMessage(taskStatus);
+ this.onTaskStatusMessage(new TaskStatusPOJO(evaluatorHeartbeatProto.getTaskStatus(), messageSequenceNumber));
}
+
LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId());
}
}
@@ -461,7 +475,9 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
* @param message
*/
private synchronized void onEvaluatorDone(final EvaluatorStatusPOJO message) {
+
assert message.getState() == State.DONE;
+
LOG.log(Level.FINEST, "Evaluator {0} done.", getId());
// Send an ACK to the Evaluator.
@@ -474,7 +490,8 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
this.stateManager.setDone();
this.messageDispatcher.onEvaluatorCompleted(new CompletedEvaluatorImpl(this.evaluatorId));
- close();
+
+ this.close();
}
/**
@@ -483,22 +500,24 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
* @param evaluatorStatus
*/
private synchronized void onEvaluatorFailed(final EvaluatorStatusPOJO evaluatorStatus) {
- assert evaluatorStatus.getState()
- == State.FAILED;
+
+ assert evaluatorStatus.getState() == State.FAILED;
+
final EvaluatorException evaluatorException;
+
if (evaluatorStatus.hasError()) {
+
final Optional<Throwable> exception =
this.exceptionCodec.fromBytes(evaluatorStatus.getError());
- if (exception.isPresent()) {
- evaluatorException = new EvaluatorException(getId(), exception.get());
- } else {
- evaluatorException = new EvaluatorException(getId(),
- new NonSerializableException("Exception sent, but can't be deserialized", evaluatorStatus.getError()));
- }
+
+ evaluatorException = new EvaluatorException(getId(), exception.isPresent() ? exception.get() :
+ new NonSerializableException("Exception sent, but can't be deserialized", evaluatorStatus.getError()));
+
} else {
evaluatorException = new EvaluatorException(getId(), new Exception("No exception sent"));
}
- onEvaluatorException(evaluatorException);
+
+ this.onEvaluatorException(evaluatorException);
}
/**
@@ -507,8 +526,10 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
* @param message
*/
private synchronized void onEvaluatorKilled(final EvaluatorStatusPOJO message) {
+
assert message.getState() == State.KILLED;
- assert stateManager.isClosing();
+ assert this.stateManager.isClosing();
+
LOG.log(Level.WARNING, "Evaluator {0} killed completely.", getId());
this.stateManager.setKilled();
@@ -519,9 +540,9 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
if (this.stateManager.isAllocated()) {
this.stateManager.setSubmitted();
this.resourceLaunchHandler.onNext(resourceLaunchEvent);
- } else if (this.stateManager.isFailedOrKilled()) {
- LOG.log(Level.WARNING, "Evaluator manager expected" + EvaluatorState.ALLOCATED +
- " state but instead is in state " + this.stateManager);
+ } else if (this.stateManager.isCompletedAbnormally()) {
+ LOG.log(Level.WARNING, "Evaluator manager expected {0} state but instead is in state {1}",
+ new Object[] {EvaluatorState.ALLOCATED, this.stateManager});
} else {
throw new RuntimeException("Evaluator manager expected " + EvaluatorState.ALLOCATED +
" state but instead is in state " + this.stateManager);
@@ -560,18 +581,17 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
private void onTaskStatusMessage(final TaskStatusPOJO taskStatus) {
if (!(this.task.isPresent() && this.task.get().getId().equals(taskStatus.getTaskId()))) {
- if (taskStatus.getState() == State.INIT ||
- taskStatus.getState() == State.FAILED ||
- taskStatus.getState() == State.RUNNING ||
- driverRestartManager.getEvaluatorRestartState(evaluatorId) == EvaluatorRestartState.REREGISTERED) {
+
+ final State state = taskStatus.getState();
+ if (state.isRestartable() ||
+ this.driverRestartManager.getEvaluatorRestartState(this.evaluatorId).isReregistered()) {
// [REEF-308] exposes a bug where the .NET evaluator does not send its states in the right order
// [REEF-289] is a related item which may fix the issue
- if (taskStatus.getState() == State.RUNNING) {
+ if (state.isRunning()) {
LOG.log(Level.WARNING,
- "Received a message of state " + ReefServiceProtos.State.RUNNING +
- " for Task " + taskStatus.getTaskId() +
- " before receiving its " + ReefServiceProtos.State.INIT + " state");
+ "Received a message of state {0} for Task {1} before receiving its {2} state",
+ new Object[] {State.RUNNING, taskStatus.getTaskId(), State.INIT});
}
// FAILED is a legal first state of a Task as it could have failed during construction.
@@ -583,11 +603,12 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
this.exceptionCodec,
this.driverRestartManager));
} else {
- throw new RuntimeException("Received a message of state " + taskStatus.getState() +
+ throw new RuntimeException("Received a message of state " + state +
", not INIT, RUNNING, or FAILED for Task " + taskStatus.getTaskId() +
" which we haven't heard from before.");
}
}
+
this.task.get().onTaskStatusMessage(taskStatus);
if (this.task.get().isNotRunning()) {
@@ -600,20 +621,28 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
* Resource status information from the (actual) resource manager.
*/
public void onResourceStatusMessage(final ResourceStatusEvent resourceStatusEvent) {
+
synchronized (this.evaluatorDescriptor) {
- LOG.log(Level.FINEST, "Resource manager state update: {0}", resourceStatusEvent.getState());
- if (!this.stateManager.isAllocatedOrSubmittedOrRunning()) {
- LOG.log(Level.FINE, "Ignoring resource status update for Evaluator {0} which is already in state {1}.",
- new Object[]{this.getId(), this.stateManager});
- } else if (isDoneOrFailedOrKilled(resourceStatusEvent) && this.stateManager.isAllocatedOrSubmittedOrRunning()) {
- // something is wrong. The resource manager reports that the Evaluator is done or failed, but the Driver assumes
- // it to be alive.
+
+ final State state = resourceStatusEvent.getState();
+ LOG.log(Level.FINEST, "Resource manager state update: {0}", state);
+
+ if (!this.stateManager.isAvailable()) {
+
+ LOG.log(Level.FINE,
+ "Ignoring resource status update for Evaluator {0} which is already in state {1}.",
+ new Object[] {this.getId(), this.stateManager});
+
+ } else if (state.isCompleted() && this.stateManager.isAvailable()) {
+
+ // Something is wrong. The resource manager reports that the Evaluator is done or failed,
+ // but the Driver assumes it to be alive.
final StringBuilder messageBuilder = new StringBuilder("Evaluator [")
.append(this.evaluatorId)
.append("] is assumed to be in state [")
.append(this.stateManager.toString())
.append("]. But the resource manager reports it to be in state [")
- .append(resourceStatusEvent.getState())
+ .append(state)
.append("].");
if (this.stateManager.isSubmitted()) {
@@ -626,6 +655,7 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
messageBuilder.append(" This means that the Evaluator failed but wasn't able to send an error message " +
"back to the driver.");
}
+
if (this.task.isPresent()) {
messageBuilder.append(" Task [")
.append(this.task.get().getId())
@@ -633,8 +663,8 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable {
}
if (resourceStatusEvent.getState() == State.KILLED) {
- this.onEvaluatorException(new EvaluatorKilledByResourceManagerException(this.evaluatorId,
- messageBuilder.toString()));
+ this.onEvaluatorException(
+ new EvaluatorKilledByResourceManagerException(this.evaluatorId, messageBuilder.toString()));
} else {
this.onEvaluatorException(new EvaluatorException(this.evaluatorId, messageBuilder.toString()));
}
http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
index 653486e..6ee80a8 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorState.java
@@ -22,17 +22,155 @@ import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
/**
- * Various states that the EvaluatorManager could be in. The EvaluatorManager is
- * created when a resource has been allocated by the ResourceManager.
+ * Various states that the EvaluatorManager could be in.
+ * The EvaluatorManager is created when a resource has been allocated by the ResourceManager.
*/
@DriverSide
@Private
enum EvaluatorState {
- ALLOCATED, // initial state
- SUBMITTED, // client called AllocatedEvaluator.submitTask() and we're waiting for first contact
- RUNNING, // first contact received, all communication channels established, Evaluator sent to client.
- CLOSING, // evaluator is asked shutdown, but not closed yet.
- DONE, // clean shutdown
- FAILED, // some failure occurred.
- KILLED // unclean shutdown
+
+ /** Initial state. */
+ ALLOCATED,
+
+ /** Client called AllocatedEvaluator.submitTask() and we're waiting for first contact. */
+ SUBMITTED,
+
+ /** First contact received, all communication channels established, Evaluator sent to client. */
+ RUNNING,
+
+ /** Evaluator is asked to shut down, but has not closed yet. */
+ CLOSING,
+
+ /** Clean shutdown. */
+ DONE,
+
+ /** Some failure occurred. */
+ FAILED,
+
+ /** Unclean shutdown. */
+ KILLED;
+
+ /**
+ * Check if evaluator is in the initial state (ALLOCATED).
+ * @return true if ALLOCATED, false otherwise.
+ */
+ public final boolean isAllocated() {
+ return this == ALLOCATED;
+ }
+
+ /**
+ * Check if evaluator is in SUBMITTED state.
+ * @return true if SUBMITTED, false otherwise.
+ */
+ public final boolean isSubmitted() {
+ return this == SUBMITTED;
+ }
+
+ /**
+ * Check if the evaluator is in running state.
+ * @return true if RUNNING, false otherwise.
+ */
+ public final boolean isRunning() {
+ return this == RUNNING;
+ }
+
+ /**
+ * Check if the evaluator is in the process of being shut down.
+ * @return true if evaluator is being closed, false otherwise.
+ */
+ public final boolean isClosing() {
+ return this == CLOSING;
+ }
+
+ /**
+ * Check if evaluator is in one of the active states (ALLOCATED, SUBMITTED, or RUNNING).
+ * @return true if evaluator is available, false if it is closed or in the process of being shut down.
+ */
+ public final boolean isAvailable() {
+ return this == ALLOCATED || this == SUBMITTED || this == RUNNING;
+ }
+
+ /**
+ * Check if the evaluator is stopped. That is, in one of the DONE, FAILED, or KILLED states.
+ * @return true if evaluator completed, false if it is still available or in the process of being shut down.
+ */
+ public final boolean isCompleted() {
+ return this == DONE || this == FAILED || this == KILLED;
+ }
+
+ /**
+ * Check if the evaluator is closed due to an error. That is, in FAILED or KILLED state.
+ * @return true if evaluator is stopped due to an error, true otherwise.
+ */
+ public final boolean isCompletedAbnormally() {
+ return this == FAILED || this == KILLED;
+ }
+
+ /**
+ * Check if transition from current state to the given one is legal.
+ * @param toState new state to transition to.
+ * @return true if transition is legal, false otherwise.
+ */
+ public final boolean isLegalTransition(final EvaluatorState toState) {
+
+ if (this == toState) {
+ return true;
+ }
+
+ switch(this) {
+
+ case ALLOCATED:
+ switch(toState) {
+ case SUBMITTED:
+ case CLOSING:
+ case DONE:
+ case FAILED:
+ case KILLED:
+ return true;
+ default:
+ return false;
+ }
+
+ case SUBMITTED:
+ switch(toState) {
+ case RUNNING:
+ case CLOSING:
+ case DONE:
+ case FAILED:
+ case KILLED:
+ return true;
+ default:
+ return false;
+ }
+
+ case RUNNING:
+ switch(toState) {
+ case CLOSING:
+ case DONE:
+ case FAILED:
+ case KILLED:
+ return true;
+ default:
+ return false;
+ }
+
+ case CLOSING:
+ switch(toState) {
+ case DONE:
+ case FAILED:
+ case KILLED:
+ return true;
+ default:
+ return false;
+ }
+
+ case DONE:
+ case FAILED:
+ case KILLED:
+ return false;
+
+ default:
+ throw new RuntimeException("Unknown state: " + this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
index a2b249a..bb78dbe 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorStatusManager.java
@@ -22,6 +22,7 @@ import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import javax.inject.Inject;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -31,160 +32,159 @@ import java.util.logging.Logger;
@DriverSide
@Private
final class EvaluatorStatusManager {
+
private static final Logger LOG = Logger.getLogger(EvaluatorStatusManager.class.getName());
+
/**
* The state managed.
*/
- private EvaluatorState state = EvaluatorState.ALLOCATED;
+ private final AtomicReference<EvaluatorState> state = new AtomicReference<>(EvaluatorState.ALLOCATED);
@Inject
private EvaluatorStatusManager() {
LOG.log(Level.FINE, "Instantiated 'EvaluatorStatusManager'");
}
- private static boolean isLegal(final EvaluatorState from, final EvaluatorState to) {
- if (from == to) {
- return true;
- }
-
- switch(from) {
- case ALLOCATED: {
- switch(to) {
- case SUBMITTED:
- case DONE:
- case CLOSING:
- case FAILED:
- return true;
- case KILLED:
- case RUNNING:
- break;
- default:
- throw new RuntimeException("Unknown state: " + to);
- }
- }
- case SUBMITTED: {
- switch(to) {
- case RUNNING:
- case DONE:
- case CLOSING:
- case FAILED:
- return true;
- case ALLOCATED:
- case KILLED:
- break;
- default:
- throw new RuntimeException("Unknown state: " + to);
- }
- }
- case RUNNING: {
- switch(to) {
- case DONE:
- case CLOSING:
- case FAILED:
- return true;
- case ALLOCATED:
- case SUBMITTED:
- case KILLED:
- break;
- default:
- throw new RuntimeException("Unknown state: " + to);
- }
- }
- case CLOSING: {
- switch(to) {
- case KILLED:
- case DONE:
- case FAILED:
- return true;
- case ALLOCATED:
- case SUBMITTED:
- case RUNNING:
- break;
- default:
- throw new RuntimeException("Unknown state: " + to);
- }
- }
- case DONE:
- case FAILED:
- case KILLED:
- break;
- default:
- throw new RuntimeException("Unknown state: " + from);
- }
-
- LOG.warning("Illegal evaluator state transition from " + from + " to " + to + ".");
- return false;
- }
-
- private static boolean isDoneOrFailedOrKilled(final EvaluatorState state) {
- return state == EvaluatorState.DONE ||
- state == EvaluatorState.FAILED ||
- state == EvaluatorState.KILLED;
- }
-
- synchronized void setRunning() {
+ void setRunning() {
this.setState(EvaluatorState.RUNNING);
}
- synchronized void setSubmitted() {
+ void setSubmitted() {
this.setState(EvaluatorState.SUBMITTED);
}
- synchronized void setClosing() {
+ void setClosing() {
this.setState(EvaluatorState.CLOSING);
}
- synchronized void setDone() {
+ void setDone() {
this.setState(EvaluatorState.DONE);
}
- synchronized void setFailed() {
+ void setFailed() {
this.setState(EvaluatorState.FAILED);
}
- synchronized void setKilled() {
+ void setKilled() {
this.setState(EvaluatorState.KILLED);
}
- synchronized boolean isRunning() {
- return this.state.equals(EvaluatorState.RUNNING);
+ /**
+ * Check if evaluator is in the initial state (ALLOCATED).
+ * @return true if allocated, false otherwise.
+ */
+ boolean isAllocated() {
+ return this.state.get().isAllocated();
+ }
+
+ /**
+ * Check if evaluator is in SUBMITTED state.
+ * @return true if submitted, false otherwise.
+ */
+ boolean isSubmitted() {
+ return this.state.get().isSubmitted();
+ }
+
+ /**
+ * Check if the evaluator is in running state.
+ * @return true if RUNNING, false otherwise.
+ */
+ boolean isRunning() {
+ return this.state.get().isRunning();
}
- synchronized boolean isDoneOrFailedOrKilled() {
- return isDoneOrFailedOrKilled(this.state);
+ /**
+ * Check if the evaluator is in the process of being shut down.
+ * @return true if evaluator is being closed, false otherwise.
+ */
+ boolean isClosing() {
+ return this.state.get().isClosing();
}
- synchronized boolean isAllocatedOrSubmittedOrRunning() {
- return this.state == EvaluatorState.ALLOCATED ||
- this.state == EvaluatorState.SUBMITTED ||
- this.state == EvaluatorState.RUNNING;
+ /**
+ * Check if evaluator is in one of the active states (ALLOCATED, SUBMITTED, or RUNNING).
+ * @return true if evaluator is available, false if it is closed or in the process of being shut down.
+ * @deprecated TODO[JIRA REEF-1560] Use isAvailable() method instead. Remove after version 0.16
+ */
+ @Deprecated
+ boolean isAllocatedOrSubmittedOrRunning() {
+ return this.state.get().isAvailable();
}
- synchronized boolean isSubmitted() {
- return EvaluatorState.SUBMITTED == this.state;
+ /**
+ * Check if evaluator is in one of the active states (ALLOCATED, SUBMITTED, or RUNNING).
+ * @return true if evaluator is available, false if it is closed or in the process of being shut down.
+ */
+ boolean isAvailable() {
+ return this.state.get().isAvailable();
}
- synchronized boolean isAllocated() {
- return EvaluatorState.ALLOCATED == this.state;
+ /**
+ * Check if the evaluator is stopped. That is, in one of the DONE, FAILED, or KILLED states.
+ * @return true if evaluator completed, false if it is still available or in the process of being shut down.
+ * @deprecated TODO[JIRA REEF-1560] Use isCompleted() method instead. Remove after version 0.16
+ */
+ @Deprecated
+ boolean isDoneOrFailedOrKilled() {
+ return this.state.get().isCompleted();
}
- synchronized boolean isFailedOrKilled() {
- return EvaluatorState.FAILED == this.state || EvaluatorState.KILLED == this.state;
+ /**
+ * Check if the evaluator is stopped. That is, in one of the DONE, FAILED, or KILLED states.
+ * @return true if evaluator completed, false if it is still available or in the process of being shut down.
+ */
+ boolean isCompleted() {
+ return this.state.get().isCompleted();
+ }
+
+ /**
+ * Check if the evaluator is closed due to an error. That is, in FAILED or KILLED state.
+ * @return true if evaluator is stopped due to an error, true otherwise.
+ * @deprecated TODO[JIRA REEF-1560] Use isCompletedAbnormally() method instead. Remove after version 0.16
+ */
+ @Deprecated
+ boolean isFailedOrKilled() {
+ return this.state.get().isCompletedAbnormally();
}
- synchronized boolean isClosing() {
- return EvaluatorState.CLOSING == this.state;
+ /**
+ * Check if the evaluator is closed due to an error. That is, in FAILED or KILLED state.
+ * @return true if evaluator is stopped due to an error, true otherwise.
+ */
+ boolean isCompletedAbnormally() {
+ return this.state.get().isCompletedAbnormally();
}
+ /**
+ * Return string representation of the current state of hte Evaluator, like RUNNING or DONE.
+ * @return string representation of the current state of the Evaluator.
+ */
@Override
- public synchronized String toString() {
- return this.state.toString();
+ public String toString() {
+ return this.state.get().toString();
}
- private synchronized void setState(final EvaluatorState state) {
- if (!isLegal(this.state, state)) {
- throw new IllegalStateException("Illegal state transition from '" + this.state + "' to '" + state + "'");
+ /**
+ * Transition to the new state of the evaluator, if possible.
+ * @param toState New state of the evaluator.
+ * @throws IllegalStateException if state transition is not valid.
+ */
+ private void setState(final EvaluatorState toState) {
+ while (true) {
+
+ final EvaluatorState fromState = this.state.get();
+ if (fromState == toState) {
+ break;
+ }
+
+ if (!fromState.isLegalTransition(toState)) {
+ LOG.log(Level.WARNING, "Illegal state transition: {0} -> {1}", new Object[] {fromState, toState});
+ throw new IllegalStateException("Illegal state transition: " + fromState + " -> " + toState);
+ }
+
+ if (this.state.compareAndSet(fromState, toState)) {
+ break;
+ }
}
- this.state = state;
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java
index 661369c..0eff9bd 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/EvaluatorStatusPOJO.java
@@ -35,55 +35,31 @@ public final class EvaluatorStatusPOJO {
private final State evaluatorState;
private final byte[] errorBytes;
-
public EvaluatorStatusPOJO(final ReefServiceProtos.EvaluatorStatusProto proto) {
-
- evaluatorID = proto.getEvaluatorId();
- evaluatorIdBytes = proto.getEvaluatorIdBytes().toByteArray();
- evaluatorState = proto.hasState()? getStateFromProto(proto.getState()) : null;
- errorBytes = proto.hasError() ? proto.getError().toByteArray() : null;
-
+ this.evaluatorID = proto.getEvaluatorId();
+ this.evaluatorIdBytes = proto.getEvaluatorIdBytes().toByteArray();
+ this.evaluatorState = proto.hasState() ? State.fromProto(proto.getState()) : null;
+ this.errorBytes = proto.hasError() ? proto.getError().toByteArray() : null;
}
/**
- * @return true, if an evaluator has thrown an exception and sent it to a driver
+ * @return true, if an evaluator has thrown an exception and sent it to a driver.
*/
public boolean hasError() {
- return null != errorBytes;
+ return null != this.errorBytes;
}
/**
- * @return serialized exception thrown by an evaluator
+ * @return serialized exception thrown by an evaluator.
*/
- public byte[] getError(){
- return errorBytes;
+ public byte[] getError() {
+ return this.errorBytes;
}
/**
- * @return current {@link org.apache.reef.runtime.common.driver.evaluator.pojos.State} of a task
+ * @return current state of a task.
*/
- public State getState(){
- return evaluatorState;
+ public State getState() {
+ return this.evaluatorState;
}
-
- private State getStateFromProto(final org.apache.reef.proto.ReefServiceProtos.State protoState) {
-
- switch (protoState) {
- case INIT:
- return State.INIT;
- case RUNNING:
- return State.RUNNING;
- case DONE:
- return State.DONE;
- case SUSPEND:
- return State.SUSPEND;
- case FAILED:
- return State.FAILED;
- case KILLED:
- return State.KILLED;
- default:
- throw new IllegalStateException("Unknown state " + protoState + " in EvaluatorStatusProto");
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java
index 477564c..6ea7cb1 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/State.java
@@ -21,20 +21,127 @@ package org.apache.reef.runtime.common.driver.evaluator.pojos;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.proto.ReefServiceProtos;
/**
* DriverSide representation of ReefServiceProtos.State.
*/
-
@DriverSide
@Private
public enum State {
- INIT,
- RUNNING,
- DONE,
- SUSPEND,
- FAILED,
- KILLED;
+ INIT,
+ RUNNING,
+ SUSPEND,
+ DONE,
+ FAILED,
+ KILLED;
+
+ /**
+ * Get a driver-side state given the proto. It is a 1:1 mapping.
+ * @param protoState remote state from the proto.
+ * @return a corresponding (identical) driver-side state (always a 1:1 mapping).
+ */
+ public static State fromProto(final ReefServiceProtos.State protoState) {
+ switch (protoState) {
+ case INIT:
+ return INIT;
+ case RUNNING:
+ return RUNNING;
+ case SUSPEND:
+ return SUSPEND;
+ case DONE:
+ return DONE;
+ case FAILED:
+ return FAILED;
+ case KILLED:
+ return KILLED;
+ default:
+ throw new IllegalStateException("Unknown state " + protoState + " in EvaluatorStatusProto");
+ }
+ }
+
+ /**
+ * Checks if the ResourceManager can switch from the current state to the target state.
+ * See REEF-826 for the state transition matrix.
+ * @param toState state to switch to.
+ * @return true if the transition is legal; false otherwise.
+ */
+ public final boolean isLegalTransition(final State toState) {
+
+ if (this == toState) {
+ return true;
+ }
+
+ switch (this) {
+
+ case INIT:
+ switch (toState) {
+ case RUNNING:
+ case SUSPEND:
+ case DONE:
+ case FAILED:
+ case KILLED:
+ return true;
+ default:
+ return false;
+ }
+
+ case RUNNING:
+ switch (toState) {
+ case SUSPEND:
+ case DONE:
+ case FAILED:
+ case KILLED:
+ return true;
+ default:
+ return false;
+ }
+
+ case SUSPEND:
+ switch (toState) {
+ case RUNNING:
+ case FAILED:
+ case KILLED:
+ return true;
+ default:
+ return false;
+ }
+
+ default:
+ return false;
+ }
+ }
+
+ /**
+ * Check if container is in RUNNING state.
+ * @return true if container is running.
+ */
+ public final boolean isRunning() {
+ return this == RUNNING;
+ }
+
+ /**
+ * Check if container is available - that is, in one of the states INIT, RUNNING, or SUSPEND.
+ * @return true if container is available, false if it is closed or in the process of being shut down.
+ */
+ public final boolean isAvailable() {
+ return this == INIT || this == RUNNING || this == SUSPEND;
+ }
+
+ /**
+ * Check if the container is stopped. That is, in one of the DONE, FAILED, or KILLED states.
+ * @return true if the container is completed, false if it is still available or suspended.
+ */
+ public final boolean isCompleted() {
+ return this == DONE || this == FAILED || this == KILLED;
+ }
+ /**
+ * Check if the container is can be restarted. That is, in one of the INIT, RUNNING, or FAILED states.
+ * @return true if the container can be restarted.
+ */
+ public final boolean isRestartable() {
+ return this == INIT || this == RUNNING || this == FAILED;
+ }
}
http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
index f2e3f2d..3eb4e23 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/pojos/TaskStatusPOJO.java
@@ -40,79 +40,57 @@ public final class TaskStatusPOJO {
private final byte[] result;
private final List<TaskMessagePOJO> taskMessages = new ArrayList<>();
- public TaskStatusPOJO(final ReefServiceProtos.TaskStatusProto proto, final long sequenceNumber){
+ public TaskStatusPOJO(final ReefServiceProtos.TaskStatusProto proto, final long sequenceNumber) {
- taskId = proto.getTaskId();
- contextId = proto.getContextId();
- state = proto.hasState()? getStateFromProto(proto.getState()) : null;
- result = proto.hasResult() ? proto.getResult().toByteArray() : null;
+ this.taskId = proto.getTaskId();
+ this.contextId = proto.getContextId();
+ this.state = proto.hasState() ? State.fromProto(proto.getState()) : null;
+ this.result = proto.hasResult() ? proto.getResult().toByteArray() : null;
for (final TaskMessageProto taskMessageProto : proto.getTaskMessageList()) {
- taskMessages.add(new TaskMessagePOJO(taskMessageProto, sequenceNumber));
+ this.taskMessages.add(new TaskMessagePOJO(taskMessageProto, sequenceNumber));
}
-
}
/**
- * @return a list of messages sent by a task
+ * @return a list of messages sent by a task.
*/
- public List<TaskMessagePOJO> getTaskMessageList(){
- return taskMessages;
+ public List<TaskMessagePOJO> getTaskMessageList() {
+ return this.taskMessages;
}
/**
- * @return true, if a completed task returned a non-null value in the 'return' statement
+ * @return true, if a completed task returned a non-null value in the 'return' statement.
*/
- public boolean hasResult(){
- return null != result;
+ public boolean hasResult() {
+ return null != this.result;
}
/**
- * @return serialized result that a completed task returned to the Driver
+ * @return serialized result that a completed task returned to the Driver.
*/
- public byte[] getResult(){
- return result;
+ public byte[] getResult() {
+ return this.result;
}
/**
- * @return the id of a task
+ * @return the id of a task.
*/
- public String getTaskId(){
- return taskId;
+ public String getTaskId() {
+ return this.taskId;
}
/**
- * @return the id of a context that this task runs within
+ * @return the id of a context that this task runs within.
*/
- public String getContextId(){
- return contextId;
+ public String getContextId() {
+ return this.contextId;
}
/**
- * @return current {@link org.apache.reef.runtime.common.driver.evaluator.pojos.State} of a task
+ * @return current state of a task.
*/
- public State getState(){
- return state;
- }
-
- private State getStateFromProto(final org.apache.reef.proto.ReefServiceProtos.State protoState) {
-
- switch (protoState) {
- case INIT:
- return State.INIT;
- case RUNNING:
- return State.RUNNING;
- case DONE:
- return State.DONE;
- case SUSPEND:
- return State.SUSPEND;
- case FAILED:
- return State.FAILED;
- case KILLED:
- return State.KILLED;
- default:
- throw new IllegalStateException("Unknown state " + protoState + " in EvaluatorStatusProto");
- }
-
+ public State getState() {
+ return this.state;
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
index 285a1b1..2792790 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/idle/DriverIdleManager.java
@@ -56,7 +56,7 @@ public final class DriverIdleManager {
final DriverStatusManager driverStatusManagerImpl = this.driverStatusManager.get();
- if (driverStatusManagerImpl.isShuttingDownOrFailing()) {
+ if (driverStatusManagerImpl.isClosing()) {
LOG.log(IDLE_REASONS_LEVEL, "Ignoring idle call from [{0}] for reason [{1}]",
new Object[] {reason.getComponentName(), reason.getReason()});
return;
http://git-wip-us.apache.org/repos/asf/reef/blob/719e6445/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
index f9a526d..4b19330 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/resourcemanager/ResourceManagerStatus.java
@@ -154,73 +154,15 @@ public final class ResourceManagerStatus implements EventHandler<RuntimeStatusEv
}
}
- /**
- * Checks if the ResourceManager can switch from the current state to the target state.
- * See REEF-826 for the state transition matrix.
- * @param from current state.
- * @param to state to switch to.
- * @return true if the transition is legal; false otherwise.
- */
- private static boolean isLegalStateTransition(final State from, final State to) {
-
- // handle diagonal elements of the transition matrix
- if (from.equals(to)) {
- LOG.log(Level.FINEST, "Transition from {0} state to the same state.", from);
- return true;
- }
-
- // handle non-diagonal elements
- switch (from) {
-
- case INIT:
- switch (to) {
- case RUNNING:
- case SUSPEND:
- case DONE:
- case FAILED:
- case KILLED:
- return true;
- default:
- return false;
- }
-
- case RUNNING:
- switch (to) {
- case SUSPEND:
- case DONE:
- case FAILED:
- case KILLED:
- return true;
- default:
- return false;
- }
-
- case SUSPEND:
- switch (to) {
- case RUNNING:
- case FAILED:
- case KILLED:
- return true;
- default:
- return false;
- }
-
- case DONE:
- case FAILED:
- case KILLED:
- return false;
-
- default:
- return false;
- }
- }
-
- private synchronized void setState(final State newState) {
- if (isLegalStateTransition(this.state, newState)) {
- this.state = newState;
+ private synchronized void setState(final State toState) {
+ if (this.state == toState) {
+ LOG.log(Level.FINE, "Transition from {0} state to the same state.", this.state);
+ } else if (this.state.isLegalTransition(toState)) {
+ LOG.log(Level.FINEST, "State transition: {0} -> {1}", new State[] {this.state, toState});
+ this.state = toState;
} else {
throw new IllegalStateException(
- "Resource manager attempts illegal state transition from " + this.state + " to " + newState);
+ "Resource manager attempts illegal state transition from " + this.state + " to " + toState);
}
}
}