You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/02/22 01:04:58 UTC
incubator-gobblin git commit: [GOBBLIN-378] Publish tasks in
successful state only
Repository: incubator-gobblin
Updated Branches:
refs/heads/master a7a85e150 -> b4597e988
[GOBBLIN-378] Publish tasks in successful state only
Closes #2253 from yukuai518/zero
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b4597e98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b4597e98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b4597e98
Branch: refs/heads/master
Commit: b4597e988c9beae18f1b4896bf0b7f36d2ea5c1f
Parents: a7a85e1
Author: Kuai Yu <ku...@linkedin.com>
Authored: Wed Feb 21 17:04:54 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Feb 21 17:04:54 2018 -0800
----------------------------------------------------------------------
.../src/main/java/org/apache/gobblin/runtime/Task.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b4597e98/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 3265ab8..c3c1b99 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -869,7 +869,9 @@ public class Task implements TaskIFace {
if (failedForkIds.size() == 0) {
// Set the task state to SUCCESSFUL. The state is not set to COMMITTED
// as the data publisher will do that upon successful data publishing.
- this.taskState.setWorkingState(WorkUnitState.WorkingState.SUCCESSFUL);
+ if (this.taskState.getWorkingState() != WorkUnitState.WorkingState.FAILED) {
+ this.taskState.setWorkingState(WorkUnitState.WorkingState.SUCCESSFUL);
+ }
} else {
failTask(new ForkException("Fork branches " + failedForkIds + " failed for task " + this.taskId));
}
@@ -903,8 +905,10 @@ public class Task implements TaskIFace {
if (shouldPublishDataInTask()) {
// If data should be published by the task, publish the data and set the task state to COMMITTED.
// Task data can only be published after all forks have been closed by closer.close().
- publishTaskData();
- this.taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+ if (this.taskState.getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL) {
+ publishTaskData();
+ this.taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+ }
}
} catch (IOException ioe) {
failTask(ioe);