You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/08/27 23:15:59 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1251] Propagate exception in TaskStateTracker for caller to trigger Helix retry

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f47f8df  [GOBBLIN-1251] Propagate exception in TaskStateTracker for caller to trigger Helix retry
f47f8df is described below

commit f47f8df79b315cc4337cae3ca1f4bc0eb8d9acb2
Author: Lei Sun <au...@gmail.com>
AuthorDate: Thu Aug 27 16:15:40 2020 -0700

    [GOBBLIN-1251] Propagate exception in TaskStateTracker for caller to trigger Helix retry
    
    Propagate exception in statsTracker if scheduling
    failed
    
    Add unit test
    
    Address comments and fix unit test failure in
    Travis
    
    Making fatal-handling of taskMetricSchedule
    failure configurable
    
    Address reviewer's comments
    
    Address reviewer's comments III
    
    Closes #3092 from
    autumnust/stateTrackerFailureRobustness
---
 .../cluster/GobblinHelixTaskStateTracker.java      | 35 +++++++---
 .../gobblin/cluster/GobblinHelixTaskTest.java      |  6 +-
 .../gobblin/runtime/GobblinMultiTaskAttempt.java   | 16 +++--
 .../runtime/GobblinMultiTaskAttemptTest.java       | 80 ++++++++++++++++++++--
 4 files changed, 113 insertions(+), 24 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java
index 7120bef..c1f4c63 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java
@@ -22,12 +22,12 @@ import java.util.Properties;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 
-import org.apache.gobblin.annotation.Alpha;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.runtime.AbstractTaskStateTracker;
 import org.apache.gobblin.runtime.Task;
@@ -42,16 +42,20 @@ import org.apache.gobblin.runtime.Task;
  *
  * @author Yinan Li
  */
-@Alpha
+@Slf4j
 public class GobblinHelixTaskStateTracker extends AbstractTaskStateTracker {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixTaskStateTracker.class);
+  @VisibleForTesting
+  static final String IS_TASK_METRICS_SCHEDULING_FAILURE_FATAL = "helixTaskTracker.isNewTaskRegFailureFatal";
+  private static final String DEFAULT_TASK_METRICS_SCHEDULING_FAILURE_FATAL = "false";
 
   // Mapping between tasks and the task state reporters associated with them
   private final Map<String, ScheduledFuture<?>> scheduledReporters = Maps.newHashMap();
+  private boolean isNewTaskRegFailureFatal;
 
   public GobblinHelixTaskStateTracker(Properties properties) {
-    super(properties, LOGGER);
+    super(properties, log);
+    isNewTaskRegFailureFatal = Boolean.parseBoolean(properties.getProperty(IS_TASK_METRICS_SCHEDULING_FAILURE_FATAL,
+        DEFAULT_TASK_METRICS_SCHEDULING_FAILURE_FATAL));
   }
 
   @Override
@@ -59,7 +63,18 @@ public class GobblinHelixTaskStateTracker extends AbstractTaskStateTracker {
     try {
       this.scheduledReporters.put(task.getTaskId(), scheduleTaskMetricsUpdater(new TaskMetricsUpdater(task), task));
     } catch (RejectedExecutionException ree) {
-      LOGGER.error(String.format("Scheduling of task state reporter for task %s was rejected", task.getTaskId()));
+      // Propagate the exception to caller that has full control of the life-cycle of a helix task.
+      log.error(String.format("Scheduling of task state reporter for task %s was rejected", task.getTaskId()));
+      if (isNewTaskRegFailureFatal) {
+        Throwables.propagate(ree);
+      }
+    } catch (Throwable t) {
+      String errorMsg = "Failure occurred for scheduling task state reporter, ";
+      if (isNewTaskRegFailureFatal) {
+        throw new RuntimeException(errorMsg, t);
+      } else {
+        log.error(errorMsg, t);
+      }
     }
   }
 
@@ -83,7 +98,7 @@ public class GobblinHelixTaskStateTracker extends AbstractTaskStateTracker {
       this.scheduledReporters.remove(task.getTaskId()).cancel(false);
     }
 
-    LOGGER.info(String
+    log.info(String
         .format("Task %s completed in %dms with state %s", task.getTaskId(), task.getTaskState().getTaskDuration(),
             task.getTaskState().getWorkingState()));
   }
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
index a17495e..951f883 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
@@ -32,7 +32,6 @@ import org.apache.gobblin.example.simplejson.SimpleJsonSource;
 import org.apache.gobblin.metastore.FsStateStore;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.runtime.TaskCreationException;
 import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.Id;
@@ -68,6 +67,7 @@ import com.google.common.eventbus.Subscribe;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
+import static org.apache.gobblin.cluster.GobblinHelixTaskStateTracker.IS_TASK_METRICS_SCHEDULING_FAILURE_FATAL;
 import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES;
 import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE;
 import static org.mockito.Mockito.when;
@@ -112,7 +112,9 @@ public class GobblinHelixTaskTest {
 
     this.helixManager = Mockito.mock(HelixManager.class);
     when(this.helixManager.getInstanceName()).thenReturn(GobblinHelixTaskTest.class.getSimpleName());
-    this.taskStateTracker = new GobblinHelixTaskStateTracker(new Properties());
+    Properties stateTrackerProp = new Properties();
+    stateTrackerProp.setProperty(IS_TASK_METRICS_SCHEDULING_FAILURE_FATAL, "true");
+    this.taskStateTracker = new GobblinHelixTaskStateTracker(stateTrackerProp);
 
     this.localFs = FileSystem.getLocal(configuration);
     this.appWorkDir = new Path(GobblinHelixTaskTest.class.getSimpleName());
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 284413b..86fe146 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -157,9 +157,11 @@ public class GobblinMultiTaskAttempt {
     Pair<List<Task>, Boolean> executionResult = runWorkUnits(countDownLatch);
     this.tasks = executionResult.getFirst();
 
-    // Indicating task creation failure, propagating exception as it should be noticeable to job launcher
+    // Indicating task submission failure, propagating exception as it should be noticeable to job launcher.
+    // Submission failure could be task-creation failure, or state-tracker failed to be scheduled so that the actual
+    // task isn't submitted into the executor.
     if (!executionResult.getSecond()) {
-      throw new TaskCreationException("Failing in creating task before execution.");
+      throw new TaskCreationException("Failing in submitting at least one task before execution.");
     }
 
     log.info("Waiting for submitted tasks of job {} to complete in container {}...", jobId, containerIdOptional.or(""));
@@ -385,7 +387,10 @@ public class GobblinMultiTaskAttempt {
   private Pair<List<Task>, Boolean> runWorkUnits(CountUpAndDownLatch countDownLatch) {
 
     List<Task> tasks = Lists.newArrayList();
-    boolean isTaskCreatedSuccessfully = true;
+
+    // A flag indicating if there are any tasks not submitted successfully.
+    // Caller of this method should handle tasks with submission failures accordingly.
+    boolean areAllTasksSubmitted = true;
     while (this.workUnits.hasNext()) {
       WorkUnit workUnit = this.workUnits.next();
       String taskId = workUnit.getProp(ConfigurationKeys.TASK_ID_KEY);
@@ -426,7 +431,7 @@ public class GobblinMultiTaskAttempt {
         if (task == null) {
           if (e instanceof RetryException) {
             // Indicating task being null due to failure in creation even after retrying.
-            isTaskCreatedSuccessfully = false;
+            areAllTasksSubmitted = false;
           }
           // task could not be created, so directly count down
           countDownLatch.countDown();
@@ -435,6 +440,7 @@ public class GobblinMultiTaskAttempt {
           // Task was created and may have been registered, but not submitted, so call the
           // task state tracker task run completion directly since the task cancel does nothing if not submitted
           this.taskStateTracker.onTaskRunCompletion(task);
+          areAllTasksSubmitted = false;
           log.error("Could not submit task for workunit {}", workUnit, e);
         } else {
           // task was created and submitted, but failed later, so cancel the task to decrement the CountDownLatch
@@ -449,7 +455,7 @@ public class GobblinMultiTaskAttempt {
     eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(jobState, JobEvent.TASKS_SUBMITTED));
     eventSubmitterBuilder.build().submit(JobEvent.TASKS_SUBMITTED, "tasksCount", Long.toString(countDownLatch.getRegisteredParties()));
 
-    return new Pair<>(tasks, isTaskCreatedSuccessfully);
+    return new Pair<>(tasks, areAllTasksSubmitted);
   }
 
   private void printMemoryUsage() {
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java
index e40b23d..b658801 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java
@@ -19,9 +19,12 @@ package org.apache.gobblin.runtime;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Properties;
 
 import org.junit.Assert;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
@@ -29,6 +32,8 @@ import com.google.common.collect.ImmutableList;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.broker.SharedResourcesBrokerImpl;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
@@ -39,8 +44,23 @@ import org.apache.gobblin.source.workunit.WorkUnit;
 import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIME_OUT_MS;
 
 
+@Slf4j
 public class GobblinMultiTaskAttemptTest {
   private GobblinMultiTaskAttempt taskAttempt;
+  private TaskExecutor taskExecutorMock;
+  private SharedResourcesBrokerImpl<GobblinScopeTypes> jobBroker;
+
+  @BeforeClass
+  public void setup() {
+    // Initializing jobBroker
+    Config config = ConfigFactory.empty();
+    SharedResourcesBrokerImpl<GobblinScopeTypes> topBroker = SharedResourcesBrokerFactory
+        .createDefaultTopLevelBroker(config, GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+    this.jobBroker = topBroker.newSubscopedBuilder(new JobScopeInstance("testJob", "job123")).build();
+
+    // Mocking task executor
+    this.taskExecutorMock = Mockito.mock(TaskExecutor.class);
+  }
 
   @Test
   public void testRunWithTaskCreationFailure()
@@ -54,12 +74,7 @@ public class GobblinMultiTaskAttemptTest {
     // Limit the number of times of retry in task-creation.
     jobState.setProp(RETRY_TIME_OUT_MS, 1000);
     TaskStateTracker stateTrackerMock = Mockito.mock(TaskStateTracker.class);
-    TaskExecutor taskExecutorMock = Mockito.mock(TaskExecutor.class);
-    Config config = ConfigFactory.empty();
-    SharedResourcesBrokerImpl<GobblinScopeTypes> topBroker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker(config,
-        GobblinScopeTypes.GLOBAL.defaultScopeInstance());
-    SharedResourcesBrokerImpl<GobblinScopeTypes> jobBroker =
-        topBroker.newSubscopedBuilder(new JobScopeInstance("testJob", "job123")).build();
+
     taskAttempt =
         new GobblinMultiTaskAttempt(workUnit.iterator(), "testJob", jobState, stateTrackerMock, taskExecutorMock,
             Optional.absent(), Optional.absent(), jobBroker);
@@ -69,11 +84,62 @@ public class GobblinMultiTaskAttemptTest {
       // org.apache.gobblin.runtime.TaskContext.getSource
       taskAttempt.run();
     } catch (Exception e) {
-      Assert.assertTrue(e instanceof IOException);
+      Assert.assertTrue(e instanceof TaskCreationException);
+      return;
+    }
+
+    // Should never reach here.
+    Assert.fail();
+  }
+
+  @Test
+  public void testRunWithTaskStatsTrackerNotScheduledFailure()
+      throws Exception {
+    TaskStateTracker stateTracker = new FailingTestStateTracker(new Properties(), log);
+    // Preparing Instance of TaskAttempt with designed failure on task creation
+    WorkUnit tmpWU = new WorkUnit();
+    // Put necessary attributes in workunit
+    tmpWU.setProp(ConfigurationKeys.TASK_ID_KEY, "task_test");
+    List<WorkUnit> workUnit = ImmutableList.of(tmpWU);
+    JobState jobState = new JobState();
+    // Limit the number of times of retry in task-creation.
+    jobState.setProp(RETRY_TIME_OUT_MS, 1000);
+    jobState.setProp(ConfigurationKeys.SOURCE_CLASS_KEY, DatasetStateStoreTest.DummySource.class.getName());
+
+    taskAttempt = new GobblinMultiTaskAttempt(workUnit.iterator(), "testJob", jobState, stateTracker, taskExecutorMock,
+        Optional.absent(), Optional.absent(), jobBroker);
+
+    try {
+      // This attempt will automatically fail since the registerNewTask call will directly throw RuntimeException
+      // as a way to simulate the case when scheduling reporter is rejected.
+      taskAttempt.run();
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof TaskCreationException);
       return;
     }
 
     // Should never reach here.
     Assert.fail();
   }
+
+  public static class FailingTestStateTracker extends AbstractTaskStateTracker {
+    public FailingTestStateTracker(Properties properties, Logger logger) {
+      super(properties, logger);
+    }
+
+    @Override
+    public void registerNewTask(Task task) {
+      throw new RuntimeException("Failing registering new task on purpose");
+    }
+
+    @Override
+    public void onTaskRunCompletion(Task task) {
+
+    }
+
+    @Override
+    public void onTaskCommitCompletion(Task task) {
+
+    }
+  }
 }
\ No newline at end of file