You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by we...@apache.org on 2015/10/25 19:39:28 UTC
incubator-reef git commit: [REEF-865] RunningTaskImpl should close
tasks in both INIT and RUNNING status
Repository: incubator-reef
Updated Branches:
refs/heads/master 76e4a4813 -> 09056ec51
[REEF-865] RunningTaskImpl should close tasks in both INIT and RUNNING status
This PR resolves the frequent failures of FailTask testsuite on Docker environment.
* Add `isClosable` function to TaskRepresenter
* Change close() and close(message) to close a task with INIT or RUNNING status
* Add a document for FailTask testsuite.
JIRA:
[REEF-865](https://issues.apache.org/jira/browse/REEF-865)
Pull Request:
This closes #584
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/09056ec5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/09056ec5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/09056ec5
Branch: refs/heads/master
Commit: 09056ec51969c08704555066aece246387cc80c5
Parents: 76e4a48
Author: Dongjoon Hyun <do...@apache.org>
Authored: Thu Oct 22 20:58:36 2015 +0900
Committer: Markus Weimer <gi...@weimo.de>
Committed: Sun Oct 25 19:36:02 2015 +0100
----------------------------------------------------------------------
.../common/driver/task/RunningTaskImpl.java | 20 ++++++++++----------
.../common/driver/task/TaskRepresenter.java | 7 +++++++
.../reef/tests/fail/task/package-info.java | 2 +-
3 files changed, 18 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/09056ec5/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
index 89a46df..76193f2 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/RunningTaskImpl.java
@@ -86,13 +86,13 @@ public final class RunningTaskImpl implements RunningTask {
public void close() {
LOG.log(Level.FINEST, "CLOSE: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() + "]");
- if (this.taskRepresenter.isNotRunning()) {
- LOG.log(Level.FINE, "Ignoring call to .close() because the task is no longer RUNNING.");
- } else {
+ if (this.taskRepresenter.isClosable()) {
final ContextControlProto contextControlProto = ContextControlProto.newBuilder()
.setStopTask(StopTaskProto.newBuilder().build())
.build();
this.evaluatorManager.sendContextControlMessage(contextControlProto);
+ } else {
+ LOG.log(Level.FINE, "Ignoring call to .close() because the task is no longer RUNNING.");
}
}
@@ -100,15 +100,15 @@ public final class RunningTaskImpl implements RunningTask {
public void close(final byte[] message) {
LOG.log(Level.FINEST, "CLOSE: TaskRuntime id[" + taskId + "] on evaluator id[" + evaluatorManager.getId() +
"] with message.");
- if (this.taskRepresenter.isNotRunning()) {
+ if (this.taskRepresenter.isClosable()) {
+ final ContextControlProto contextControlProto = ContextControlProto.newBuilder()
+ .setStopTask(StopTaskProto.newBuilder().build())
+ .setTaskMessage(ByteString.copyFrom(message))
+ .build();
+ this.evaluatorManager.sendContextControlMessage(contextControlProto);
+ } else {
throw new RuntimeException("Trying to send a message to a Task that is no longer RUNNING.");
}
-
- final ContextControlProto contextControlProto = ContextControlProto.newBuilder()
- .setStopTask(StopTaskProto.newBuilder().build())
- .setTaskMessage(ByteString.copyFrom(message))
- .build();
- this.evaluatorManager.sendContextControlMessage(contextControlProto);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/09056ec5/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
index 0107878..3f70817 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/task/TaskRepresenter.java
@@ -202,6 +202,13 @@ public final class TaskRepresenter {
return this.state != ReefServiceProtos.State.RUNNING;
}
+ /**
+ * @return true, if this task is in INIT or RUNNING status.
+ */
+ public boolean isClosable() {
+ return this.state == ReefServiceProtos.State.INIT || this.state == ReefServiceProtos.State.RUNNING;
+ }
+
private void setState(final ReefServiceProtos.State newState) {
LOG.log(Level.FINE, "Task [{0}] state transition from [{1}] to [{2}]",
new Object[]{this.taskId, this.state, newState});
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/09056ec5/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/package-info.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/package-info.java b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/package-info.java
index 8d3c9fa..974749e 100644
--- a/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/package-info.java
+++ b/lang/java/reef-tests/src/main/java/org/apache/reef/tests/fail/task/package-info.java
@@ -17,6 +17,6 @@
* under the License.
*/
/**
- * TODO: Document.
+ * Tests for Tasks fail in various cases.
*/
package org.apache.reef.tests.fail.task;