You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/20 00:46:30 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-963] Remove
duplicated copies of TaskContext/TaskState when constructing
TaskIFaceWrapper
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2ec0cde [GOBBLIN-963] Remove duplicated copies of TaskContext/TaskState when constructing TaskIFaceWrapper
2ec0cde is described below
commit 2ec0cde77870a24e2efba5acc9529fbd30ec0710
Author: Chen Guo <al...@gmail.com>
AuthorDate: Tue Nov 19 16:46:23 2019 -0800
[GOBBLIN-963] Remove duplicated copies of TaskContext/TaskState when constructing TaskIFaceWrapper
Closes #2818 from enjoyear/GOBBLIN-963
---
.../gobblin/runtime/GobblinMultiTaskAttempt.java | 45 +++++++++++++---------
1 file changed, 26 insertions(+), 19 deletions(-)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index fc98beb..ebd3be9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -38,6 +38,9 @@ import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import javax.annotation.Nullable;
+import lombok.Setter;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.TaskScopeInstance;
@@ -60,9 +63,6 @@ import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
-import javax.annotation.Nullable;
-import lombok.Setter;
-
/**
* Attempt of running multiple {@link Task}s generated from a list of{@link WorkUnit}s.
@@ -79,6 +79,7 @@ public class GobblinMultiTaskAttempt {
* Commit {@link GobblinMultiTaskAttempt} immediately after running is done.
*/
IMMEDIATE,
+
/**
* Not committing {@link GobblinMultiTaskAttempt} but leaving it to user customized launcher.
*/
@@ -129,7 +130,8 @@ public class GobblinMultiTaskAttempt {
* @throws IOException
* @throws InterruptedException
*/
- public void run() throws IOException, InterruptedException {
+ public void run()
+ throws IOException, InterruptedException {
if (!this.workUnits.hasNext()) {
log.warn("No work units to run in container " + containerIdOptional.or(""));
return;
@@ -158,7 +160,8 @@ public class GobblinMultiTaskAttempt {
log.info("All assigned tasks of job {} have completed in container {}", jobId, containerIdOptional.or(""));
}
- private void interruptTaskExecution(CountDownLatch countDownLatch) throws InterruptedException {
+ private void interruptTaskExecution(CountDownLatch countDownLatch)
+ throws InterruptedException {
log.info("Job interrupted. Attempting a graceful shutdown of the job.");
this.tasks.forEach(Task::shutdown);
if (!countDownLatch.await(5, TimeUnit.SECONDS)) {
@@ -176,7 +179,8 @@ public class GobblinMultiTaskAttempt {
* 3. persist task statestore.
* @throws IOException
*/
- public void commit() throws IOException {
+ public void commit()
+ throws IOException {
if (this.tasks == null || this.tasks.isEmpty()) {
log.warn("No tasks to be committed in container " + containerIdOptional.or(""));
return;
@@ -188,7 +192,8 @@ public class GobblinMultiTaskAttempt {
return new Callable<Void>() {
@Nullable
@Override
- public Void call() throws Exception {
+ public Void call()
+ throws Exception {
task.commit();
return null;
}
@@ -199,8 +204,8 @@ public class GobblinMultiTaskAttempt {
try {
List<Either<Void, ExecutionException>> executionResults =
new IteratorExecutor<>(callableIterator, this.getTaskCommitThreadPoolSize(),
- ExecutorsUtils.newDaemonThreadFactory(Optional.of(log),
- Optional.of("Task-committing-pool-%d"))).executeAndGetResults();
+ ExecutorsUtils.newDaemonThreadFactory(Optional.of(log), Optional.of("Task-committing-pool-%d")))
+ .executeAndGetResults();
IteratorExecutor.logFailures(executionResults, log, 10);
} catch (InterruptedException ie) {
log.error("Committing of tasks interrupted. Aborting.");
@@ -220,7 +225,8 @@ public class GobblinMultiTaskAttempt {
* A method that shuts down all running tasks managed by this instance.
* TODO: Call this from the right place.
*/
- public void shutdownTasks() throws InterruptedException {
+ public void shutdownTasks()
+ throws InterruptedException {
log.info("Shutting down tasks");
for (Task task : this.tasks) {
task.shutdown();
@@ -239,7 +245,8 @@ public class GobblinMultiTaskAttempt {
}
}
- private void persistTaskStateStore() throws IOException {
+ private void persistTaskStateStore()
+ throws IOException {
if (!this.taskStateStoreOptional.isPresent()) {
log.info("Task state store does not exist.");
return;
@@ -277,8 +284,8 @@ public class GobblinMultiTaskAttempt {
// to filter out successful tasks on subsequent attempts.
if (task.getTaskState().getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL
|| task.getTaskState().getWorkingState() == WorkUnitState.WorkingState.COMMITTED) {
- taskStateStore.put(task.getJobId(), task.getTaskId() + TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX,
- task.getTaskState());
+ taskStateStore
+ .put(task.getJobId(), task.getTaskId() + TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX, task.getTaskState());
}
}
@@ -421,8 +428,8 @@ public class GobblinMultiTaskAttempt {
this.log.info("Heap Memory");
this.log.info(String.format(format, "init", "used", "Committed", "max"));
- this.log.info(String.format(format, heapMemory.getInit(), heapMemory.getUsed(), heapMemory.getCommitted(),
- heapMemory.getMax()));
+ this.log.info(String
+ .format(format, heapMemory.getInit(), heapMemory.getUsed(), heapMemory.getCommitted(), heapMemory.getMax()));
this.log.info("Non-heap Memory");
this.log.info(String.format(format, "init", "used", "Committed", "max"));
@@ -432,12 +439,12 @@ public class GobblinMultiTaskAttempt {
private Task createTaskRunnable(WorkUnitState workUnitState, CountDownLatch countDownLatch) {
Optional<TaskFactory> taskFactoryOpt = TaskUtils.getTaskFactory(workUnitState);
+ final TaskContext taskContext = new TaskContext(workUnitState);
if (taskFactoryOpt.isPresent()) {
- return new TaskIFaceWrapper(taskFactoryOpt.get().createTask(new TaskContext(workUnitState)),
- new TaskContext(workUnitState), countDownLatch, this.taskStateTracker);
+ return new TaskIFaceWrapper(taskFactoryOpt.get().createTask(taskContext), taskContext, countDownLatch,
+ this.taskStateTracker);
} else {
- return new Task(new TaskContext(workUnitState), this.taskStateTracker, this.taskExecutor,
- Optional.of(countDownLatch));
+ return new Task(taskContext, this.taskStateTracker, this.taskExecutor, Optional.of(countDownLatch));
}
}