You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/03/05 07:38:08 UTC

[GitHub] [incubator-gobblin] autumnust opened a new pull request #2909: [GOBBLIN-1071] Retry task initialization

autumnust opened a new pull request #2909: [GOBBLIN-1071] Retry task initialization
URL: https://github.com/apache/incubator-gobblin/pull/2909
 
 
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-XXX
   
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if applicable):
   
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization
URL: https://github.com/apache/incubator-gobblin/pull/2909#discussion_r389199355
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
 ##########
 @@ -44,6 +49,9 @@
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.Id;
 
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
 
 Review comment:
   Addressed, thanks for catching. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization
URL: https://github.com/apache/incubator-gobblin/pull/2909#discussion_r388397744
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
 ##########
 @@ -44,6 +49,9 @@
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.Id;
 
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
 
 Review comment:
   Are these imports being used anywhere?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization
URL: https://github.com/apache/incubator-gobblin/pull/2909#discussion_r388441943
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
 ##########
 @@ -50,49 +54,74 @@
 public class SingleTask {
 
   private static final Logger _logger = LoggerFactory.getLogger(SingleTask.class);
+  public static final String MAX_RETRY_WAITING_FOR_INIT_KEY = "maxRetryBlockedOnTaskAttemptInit";
+  public static final int DEFAULT_MAX_RETRY_WAITING_FOR_INIT = 2;
 
-  private GobblinMultiTaskAttempt _taskattempt;
+  private GobblinMultiTaskAttempt _taskAttempt;
   private String _jobId;
   private Path _workUnitFilePath;
   private Path _jobStateFilePath;
   private FileSystem _fs;
   private TaskAttemptBuilder _taskAttemptBuilder;
   private StateStores _stateStores;
   private Config _dynamicConfig;
+  private List<WorkUnit> _workUnits;
+  private JobState _jobState;
 
+  // Preventing Helix calling cancel before taskAttempt is created
+  // Checking if taskAttempt is empty is not enough, since canceller runs in different thread as runner, the case to
+  // to avoided here is taskAttempt being created and start to run after cancel has been called.
+  private Condition _taskAttemptBuilt;
+  private Lock _lock;
+
+  /**
+   * Do all heavy-lifting of initialization in constructor which could be retried if failed,
+   * see the example in {@link GobblinHelixTask}.
+   */
   SingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs,
-      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) {
+      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) throws IOException {
     _jobId = jobId;
     _workUnitFilePath = workUnitFilePath;
     _jobStateFilePath = jobStateFilePath;
     _fs = fs;
     _taskAttemptBuilder = taskAttemptBuilder;
     _stateStores = stateStores;
     _dynamicConfig = dynamicConfig;
+
+    _workUnits = getWorkUnits();
+    _jobState = getJobState();
+    _lock = new ReentrantLock();
+    _taskAttemptBuilt = _lock.newCondition();
   }
 
   public void run()
       throws IOException, InterruptedException {
-    List<WorkUnit> workUnits = getWorkUnits();
 
-    JobState jobState = getJobState();
     // Add dynamic configuration to the job state
-    _dynamicConfig.entrySet().forEach(e -> jobState.setProp(e.getKey(), e.getValue().unwrapped().toString()));
+    _dynamicConfig.entrySet().forEach(e -> _jobState.setProp(e.getKey(), e.getValue().unwrapped().toString()));
 
-    Config jobConfig = getConfigFromJobState(jobState);
+    Config jobConfig = getConfigFromJobState(_jobState);
 
     _logger.debug("SingleTask.run: jobId {} workUnitFilePath {} jobStateFilePath {} jobState {} jobConfig {}",
-        _jobId, _workUnitFilePath, _jobStateFilePath, jobState, jobConfig);
+        _jobId, _workUnitFilePath, _jobStateFilePath, _jobState, jobConfig);
 
     try (SharedResourcesBroker<GobblinScopeTypes> globalBroker = SharedResourcesBrokerFactory
         .createDefaultTopLevelBroker(jobConfig, GobblinScopeTypes.GLOBAL.defaultScopeInstance())) {
-      SharedResourcesBroker<GobblinScopeTypes> jobBroker = getJobBroker(jobState, globalBroker);
+      SharedResourcesBroker<GobblinScopeTypes> jobBroker = getJobBroker(_jobState, globalBroker);
+
+      // Secure atomicity of taskAttempt's execution.
+      _lock.lock();
+      try {
+        _taskAttempt = _taskAttemptBuilder.build(_workUnits.iterator(), _jobId, _jobState, jobBroker);
+        _taskAttempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
+        _taskAttemptBuilt.signal();
 
 Review comment:
   How do we handle missed signals? Are we sure the thread waiting on the signal is waiting?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on issue #2909: [GOBBLIN-1071] Retry task initialization

Posted by GitBox <gi...@apache.org>.
sv2000 commented on issue #2909: [GOBBLIN-1071] Retry task initialization
URL: https://github.com/apache/incubator-gobblin/pull/2909#issuecomment-596053941
 
 
   @autumnust LGTM. Once travis passes, will merge.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] asfgit closed pull request #2909: [GOBBLIN-1071] Retry task initialization

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #2909: [GOBBLIN-1071] Retry task initialization
URL: https://github.com/apache/incubator-gobblin/pull/2909
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization
URL: https://github.com/apache/incubator-gobblin/pull/2909#discussion_r388425904
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
 ##########
 @@ -97,28 +105,35 @@ public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,
                              builder.getAppWorkPath(),
                              this.jobId);
 
-    Config dynamicConfig = builder.getDynamicConfig()
-        .withValue(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getHostName()))
-        .withValue(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, ConfigValueFactory.fromAnyRef(builder.getContainerId()))
-        .withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getInstanceName()))
-        .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixJobId))
-        .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixTaskId));
-
     Integer partitionNum = getPartitionForHelixTask(taskDriver);
-
     if (partitionNum == null) {
       throw new IllegalStateException(String.format("Task %s, job %s on instance %s has no partition assigned",
           this.helixTaskId, builder.getInstanceName(), this.helixJobId));
     }
 
-    dynamicConfig = dynamicConfig.withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY, ConfigValueFactory.fromAnyRef(partitionNum));
-    this.task = new SingleTask(this.jobId,
-                               this.workUnitFilePath,
-                               jobStateFilePath,
-                               builder.getFs(),
-                               taskAttemptBuilder,
-                               stateStores,
-                               dynamicConfig);
+    final Config taskLevelConfig = builder.getConfig()
+        .withValue(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getHostName()))
+        .withValue(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, ConfigValueFactory.fromAnyRef(builder.getContainerId()))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getInstanceName()))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixJobId))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixTaskId))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY, ConfigValueFactory.fromAnyRef(partitionNum));
+
+    Retryer<SingleTask> retryer = RetryerFactory.newInstance(taskLevelConfig);
 
 Review comment:
   Also, do we need a retryer around SingleTask creation since the constructor of SingleTask is just setting member variables.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] codecov-io commented on issue #2909: [GOBBLIN-1071] Retry task initialization

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #2909: [GOBBLIN-1071] Retry task initialization
URL: https://github.com/apache/incubator-gobblin/pull/2909#issuecomment-596158584
 
 
   # [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2909?src=pr&el=h1) Report
   > Merging [#2909](https://codecov.io/gh/apache/incubator-gobblin/pull/2909?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-gobblin/commit/64f9339027e4cbaec320ae7849a4f47e2d71e1c9?src=pr&el=desc) will **increase** coverage by `<.01%`.
   > The diff coverage is `66.21%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-gobblin/pull/2909/graphs/tree.svg?width=650&token=4MgURJ0bGc&height=150&src=pr)](https://codecov.io/gh/apache/incubator-gobblin/pull/2909?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #2909      +/-   ##
   ============================================
   + Coverage     45.85%   45.86%   +<.01%     
   - Complexity     9187     9188       +1     
   ============================================
     Files          1934     1934              
     Lines         72858    72886      +28     
     Branches       8033     8035       +2     
   ============================================
   + Hits          33411    33428      +17     
   - Misses        36382    36395      +13     
   + Partials       3065     3063       -2
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-gobblin/pull/2909?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...g/apache/gobblin/cluster/InMemoryWuSingleTask.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2909/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSW5NZW1vcnlXdVNpbmdsZVRhc2suamF2YQ==) | `100% <ø> (ø)` | `3 <0> (ø)` | :arrow_down: |
   | [...he/gobblin/cluster/InMemoryWuFailedSingleTask.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2909/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSW5NZW1vcnlXdUZhaWxlZFNpbmdsZVRhc2suamF2YQ==) | `100% <ø> (ø)` | `3 <0> (ø)` | :arrow_down: |
   | [...src/main/java/org/apache/gobblin/runtime/Task.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2909/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvVGFzay5qYXZh) | `67.26% <ø> (ø)` | `84 <0> (ø)` | :arrow_down: |
   | [...org/apache/gobblin/yarn/GobblinYarnTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2909/diff?src=pr&el=tree#diff-Z29iYmxpbi15YXJuL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3lhcm4vR29iYmxpbllhcm5UYXNrUnVubmVyLmphdmE=) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
   | [.../org/apache/gobblin/util/retry/RetryerFactory.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2909/diff?src=pr&el=tree#diff-Z29iYmxpbi11dGlsaXR5L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3V0aWwvcmV0cnkvUmV0cnllckZhY3RvcnkuamF2YQ==) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
   | [...apache/gobblin/runtime/util/RuntimeConstructs.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2909/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvdXRpbC9SdW50aW1lQ29uc3RydWN0cy5qYXZh) | `0% <0%> (ø)` | `0 <0> (ø)` | :arrow_down: |
   | [...a/org/apache/gobblin/cluster/SingleTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2909/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvU2luZ2xlVGFza1J1bm5lci5qYXZh) | `87.71% <100%> (ø)` | `11 <0> (ø)` | :arrow_down: |
   | [...pache/gobblin/runtime/GobblinMultiTaskAttempt.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2909/diff?src=pr&el=tree#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvR29iYmxpbk11bHRpVGFza0F0dGVtcHQuamF2YQ==) | `57.32% <66.66%> (+0.43%)` | `28 <1> (+1)` | :arrow_up: |
   | [...ache/gobblin/cluster/InMemorySingleTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2909/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvSW5NZW1vcnlTaW5nbGVUYXNrUnVubmVyLmphdmE=) | `75% <75%> (-25%)` | `3 <1> (ø)` | |
   | [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/incubator-gobblin/pull/2909/diff?src=pr&el=tree#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `65.06% <75%> (ø)` | `29 <3> (ø)` | :arrow_down: |
   | ... and [7 more](https://codecov.io/gh/apache/incubator-gobblin/pull/2909/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2909?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-gobblin/pull/2909?src=pr&el=footer). Last update [64f9339...e7066d4](https://codecov.io/gh/apache/incubator-gobblin/pull/2909?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization
URL: https://github.com/apache/incubator-gobblin/pull/2909#discussion_r389199635
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
 ##########
 @@ -97,28 +105,35 @@ public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,
                              builder.getAppWorkPath(),
                              this.jobId);
 
-    Config dynamicConfig = builder.getDynamicConfig()
-        .withValue(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getHostName()))
-        .withValue(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, ConfigValueFactory.fromAnyRef(builder.getContainerId()))
-        .withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getInstanceName()))
-        .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixJobId))
-        .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixTaskId));
-
     Integer partitionNum = getPartitionForHelixTask(taskDriver);
-
     if (partitionNum == null) {
       throw new IllegalStateException(String.format("Task %s, job %s on instance %s has no partition assigned",
           this.helixTaskId, builder.getInstanceName(), this.helixJobId));
     }
 
-    dynamicConfig = dynamicConfig.withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY, ConfigValueFactory.fromAnyRef(partitionNum));
-    this.task = new SingleTask(this.jobId,
-                               this.workUnitFilePath,
-                               jobStateFilePath,
-                               builder.getFs(),
-                               taskAttemptBuilder,
-                               stateStores,
-                               dynamicConfig);
+    final Config taskLevelConfig = builder.getConfig()
+        .withValue(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getHostName()))
+        .withValue(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, ConfigValueFactory.fromAnyRef(builder.getContainerId()))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getInstanceName()))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixJobId))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixTaskId))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY, ConfigValueFactory.fromAnyRef(partitionNum));
+
+    Retryer<SingleTask> retryer = RetryerFactory.newInstance(taskLevelConfig);
 
 Review comment:
   Regarding to the default setting, the timeout is 5 minutes ( it uses TimeUnit.MINUTES.toMillis to convert 5 minutes to ms ) 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization
URL: https://github.com/apache/incubator-gobblin/pull/2909#discussion_r388396992
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
 ##########
 @@ -97,28 +105,35 @@ public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,
                              builder.getAppWorkPath(),
                              this.jobId);
 
-    Config dynamicConfig = builder.getDynamicConfig()
-        .withValue(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getHostName()))
-        .withValue(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, ConfigValueFactory.fromAnyRef(builder.getContainerId()))
-        .withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getInstanceName()))
-        .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixJobId))
-        .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixTaskId));
-
     Integer partitionNum = getPartitionForHelixTask(taskDriver);
-
     if (partitionNum == null) {
       throw new IllegalStateException(String.format("Task %s, job %s on instance %s has no partition assigned",
           this.helixTaskId, builder.getInstanceName(), this.helixJobId));
     }
 
-    dynamicConfig = dynamicConfig.withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY, ConfigValueFactory.fromAnyRef(partitionNum));
-    this.task = new SingleTask(this.jobId,
-                               this.workUnitFilePath,
-                               jobStateFilePath,
-                               builder.getFs(),
-                               taskAttemptBuilder,
-                               stateStores,
-                               dynamicConfig);
+    final Config taskLevelConfig = builder.getConfig()
+        .withValue(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getHostName()))
+        .withValue(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, ConfigValueFactory.fromAnyRef(builder.getContainerId()))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getInstanceName()))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixJobId))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixTaskId))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY, ConfigValueFactory.fromAnyRef(partitionNum));
+
+    Retryer<SingleTask> retryer = RetryerFactory.newInstance(taskLevelConfig);
 
 Review comment:
   Check the retryer defaults. I think 5ms for a retry attempy may be too aggressive, particularly if we have connceting to state stores as part of the initialization.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization
URL: https://github.com/apache/incubator-gobblin/pull/2909#discussion_r388438736
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
 ##########
 @@ -50,49 +54,74 @@
 public class SingleTask {
 
   private static final Logger _logger = LoggerFactory.getLogger(SingleTask.class);
+  public static final String MAX_RETRY_WAITING_FOR_INIT_KEY = "maxRetryBlockedOnTaskAttemptInit";
+  public static final int DEFAULT_MAX_RETRY_WAITING_FOR_INIT = 2;
 
-  private GobblinMultiTaskAttempt _taskattempt;
+  private GobblinMultiTaskAttempt _taskAttempt;
   private String _jobId;
   private Path _workUnitFilePath;
   private Path _jobStateFilePath;
   private FileSystem _fs;
   private TaskAttemptBuilder _taskAttemptBuilder;
   private StateStores _stateStores;
   private Config _dynamicConfig;
+  private List<WorkUnit> _workUnits;
+  private JobState _jobState;
 
+  // Preventing Helix calling cancel before taskAttempt is created
+  // Checking if taskAttempt is empty is not enough, since canceller runs in different thread as runner, the case to
+  // to avoided here is taskAttempt being created and start to run after cancel has been called.
+  private Condition _taskAttemptBuilt;
+  private Lock _lock;
+
+  /**
+   * Do all heavy-lifting of initialization in constructor which could be retried if failed,
+   * see the example in {@link GobblinHelixTask}.
+   */
   SingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs,
-      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) {
+      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) throws IOException {
     _jobId = jobId;
     _workUnitFilePath = workUnitFilePath;
     _jobStateFilePath = jobStateFilePath;
     _fs = fs;
     _taskAttemptBuilder = taskAttemptBuilder;
     _stateStores = stateStores;
     _dynamicConfig = dynamicConfig;
+
+    _workUnits = getWorkUnits();
+    _jobState = getJobState();
+    _lock = new ReentrantLock();
+    _taskAttemptBuilt = _lock.newCondition();
   }
 
   public void run()
       throws IOException, InterruptedException {
-    List<WorkUnit> workUnits = getWorkUnits();
 
-    JobState jobState = getJobState();
     // Add dynamic configuration to the job state
-    _dynamicConfig.entrySet().forEach(e -> jobState.setProp(e.getKey(), e.getValue().unwrapped().toString()));
+    _dynamicConfig.entrySet().forEach(e -> _jobState.setProp(e.getKey(), e.getValue().unwrapped().toString()));
 
-    Config jobConfig = getConfigFromJobState(jobState);
+    Config jobConfig = getConfigFromJobState(_jobState);
 
     _logger.debug("SingleTask.run: jobId {} workUnitFilePath {} jobStateFilePath {} jobState {} jobConfig {}",
-        _jobId, _workUnitFilePath, _jobStateFilePath, jobState, jobConfig);
+        _jobId, _workUnitFilePath, _jobStateFilePath, _jobState, jobConfig);
 
     try (SharedResourcesBroker<GobblinScopeTypes> globalBroker = SharedResourcesBrokerFactory
         .createDefaultTopLevelBroker(jobConfig, GobblinScopeTypes.GLOBAL.defaultScopeInstance())) {
-      SharedResourcesBroker<GobblinScopeTypes> jobBroker = getJobBroker(jobState, globalBroker);
+      SharedResourcesBroker<GobblinScopeTypes> jobBroker = getJobBroker(_jobState, globalBroker);
+
+      // Secure atomicity of taskAttempt's execution.
+      _lock.lock();
+      try {
+        _taskAttempt = _taskAttemptBuilder.build(_workUnits.iterator(), _jobId, _jobState, jobBroker);
 
 Review comment:
   Looks like the attempt creation can be moved outside the critical section.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization
URL: https://github.com/apache/incubator-gobblin/pull/2909#discussion_r389199207
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
 ##########
 @@ -50,49 +54,74 @@
 public class SingleTask {
 
   private static final Logger _logger = LoggerFactory.getLogger(SingleTask.class);
+  public static final String MAX_RETRY_WAITING_FOR_INIT_KEY = "maxRetryBlockedOnTaskAttemptInit";
+  public static final int DEFAULT_MAX_RETRY_WAITING_FOR_INIT = 2;
 
-  private GobblinMultiTaskAttempt _taskattempt;
+  private GobblinMultiTaskAttempt _taskAttempt;
   private String _jobId;
   private Path _workUnitFilePath;
   private Path _jobStateFilePath;
   private FileSystem _fs;
   private TaskAttemptBuilder _taskAttemptBuilder;
   private StateStores _stateStores;
   private Config _dynamicConfig;
+  private List<WorkUnit> _workUnits;
+  private JobState _jobState;
 
+  // Preventing Helix calling cancel before taskAttempt is created
+  // Checking if taskAttempt is empty is not enough, since canceller runs in different thread as runner, the case to
+  // to avoided here is taskAttempt being created and start to run after cancel has been called.
+  private Condition _taskAttemptBuilt;
+  private Lock _lock;
+
+  /**
+   * Do all heavy-lifting of initialization in constructor which could be retried if failed,
+   * see the example in {@link GobblinHelixTask}.
+   */
   SingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs,
-      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) {
+      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) throws IOException {
     _jobId = jobId;
     _workUnitFilePath = workUnitFilePath;
     _jobStateFilePath = jobStateFilePath;
     _fs = fs;
     _taskAttemptBuilder = taskAttemptBuilder;
     _stateStores = stateStores;
     _dynamicConfig = dynamicConfig;
+
+    _workUnits = getWorkUnits();
 
 Review comment:
   Originally was thinking to have all heavy-lifting work in constructor ( and also make retry more meaning ful there) I don't have strong opinion on this though.. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization
URL: https://github.com/apache/incubator-gobblin/pull/2909#discussion_r389205591
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 ##########
 @@ -456,6 +464,33 @@ private Task createTaskRunnable(WorkUnitState workUnitState, CountDownLatch coun
     }
   }
 
+  /**
+   * As the initialization of {@link Task} could have unstable external connection which could be healed through
+   * retry, adding retry-wrapper here for the sake of fault-tolerance.
+   */
+  private Task createTaskWithRetry(WorkUnitState workUnitState, CountDownLatch countDownLatch) {
+    Config config = ConfigUtils.propertiesToConfig(this.jobState.getProperties())
+        .withValue(RETRY_TIME_OUT_MS, ConfigValueFactory.fromAnyRef(TimeUnit.MINUTES.toMillis(1L)))
+        .withValue(RETRY_INTERVAL_MS, ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.toMillis(2L)));
+    Retryer<Task> retryer = RetryerFactory.newInstance(config);
+    // An "effectively final" variable for counting how many retried has been done, mostly for logging purpose.
+    final AtomicInteger counter = new AtomicInteger(0);
+
+    try {
+      return retryer.call(new Callable<Task>() {
+        @Override
+        public Task call()
+            throws Exception {
+          counter.incrementAndGet();
+          log.info(String.format("The %s time that trying create Task object", counter.get()));
 
 Review comment:
   "Task creation Attempt# %s"?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization
URL: https://github.com/apache/incubator-gobblin/pull/2909#discussion_r388437144
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
 ##########
 @@ -50,49 +54,74 @@
 public class SingleTask {
 
   private static final Logger _logger = LoggerFactory.getLogger(SingleTask.class);
+  public static final String MAX_RETRY_WAITING_FOR_INIT_KEY = "maxRetryBlockedOnTaskAttemptInit";
+  public static final int DEFAULT_MAX_RETRY_WAITING_FOR_INIT = 2;
 
-  private GobblinMultiTaskAttempt _taskattempt;
+  private GobblinMultiTaskAttempt _taskAttempt;
   private String _jobId;
   private Path _workUnitFilePath;
   private Path _jobStateFilePath;
   private FileSystem _fs;
   private TaskAttemptBuilder _taskAttemptBuilder;
   private StateStores _stateStores;
   private Config _dynamicConfig;
+  private List<WorkUnit> _workUnits;
+  private JobState _jobState;
 
+  // Preventing Helix calling cancel before taskAttempt is created
+  // Checking if taskAttempt is empty is not enough, since canceller runs in different thread as runner, the case to
+  // to avoided here is taskAttempt being created and start to run after cancel has been called.
+  private Condition _taskAttemptBuilt;
+  private Lock _lock;
+
+  /**
+   * Do all heavy-lifting of initialization in constructor which could be retried if failed,
+   * see the example in {@link GobblinHelixTask}.
+   */
   SingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs,
-      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) {
+      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores, Config dynamicConfig) throws IOException {
     _jobId = jobId;
     _workUnitFilePath = workUnitFilePath;
     _jobStateFilePath = jobStateFilePath;
     _fs = fs;
     _taskAttemptBuilder = taskAttemptBuilder;
     _stateStores = stateStores;
     _dynamicConfig = dynamicConfig;
+
+    _workUnits = getWorkUnits();
 
 Review comment:
   Why do we have to eagerly initialize workunits?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization
URL: https://github.com/apache/incubator-gobblin/pull/2909#discussion_r389198832
 
 

 ##########
 File path: gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
 ##########
 @@ -97,28 +105,35 @@ public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,
                              builder.getAppWorkPath(),
                              this.jobId);
 
-    Config dynamicConfig = builder.getDynamicConfig()
-        .withValue(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getHostName()))
-        .withValue(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, ConfigValueFactory.fromAnyRef(builder.getContainerId()))
-        .withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getInstanceName()))
-        .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixJobId))
-        .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixTaskId));
-
     Integer partitionNum = getPartitionForHelixTask(taskDriver);
-
     if (partitionNum == null) {
       throw new IllegalStateException(String.format("Task %s, job %s on instance %s has no partition assigned",
           this.helixTaskId, builder.getInstanceName(), this.helixJobId));
     }
 
-    dynamicConfig = dynamicConfig.withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY, ConfigValueFactory.fromAnyRef(partitionNum));
-    this.task = new SingleTask(this.jobId,
-                               this.workUnitFilePath,
-                               jobStateFilePath,
-                               builder.getFs(),
-                               taskAttemptBuilder,
-                               stateStores,
-                               dynamicConfig);
+    final Config taskLevelConfig = builder.getConfig()
+        .withValue(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getHostName()))
+        .withValue(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, ConfigValueFactory.fromAnyRef(builder.getContainerId()))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(builder.getInstanceName()))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixJobId))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY, ConfigValueFactory.fromAnyRef(this.helixTaskId))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY, ConfigValueFactory.fromAnyRef(partitionNum));
+
+    Retryer<SingleTask> retryer = RetryerFactory.newInstance(taskLevelConfig);
 
 Review comment:
   I added some other components into SingleTask's constructor including `getJobState` and `getWorkunits`. I actually have no strong opinion on whether we should do retry here, but since `getJobState` is still an "external" call ie keep `getWorkunits` called lazily, I think it is still good to have

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #2909: [GOBBLIN-1071] Retry task initialization
URL: https://github.com/apache/incubator-gobblin/pull/2909#discussion_r389205881
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 ##########
 @@ -456,6 +464,33 @@ private Task createTaskRunnable(WorkUnitState workUnitState, CountDownLatch coun
     }
   }
 
+  /**
+   * As the initialization of {@link Task} could have unstable external connection which could be healed through
+   * retry, adding retry-wrapper here for the sake of fault-tolerance.
+   */
+  private Task createTaskWithRetry(WorkUnitState workUnitState, CountDownLatch countDownLatch) {
+    Config config = ConfigUtils.propertiesToConfig(this.jobState.getProperties())
+        .withValue(RETRY_TIME_OUT_MS, ConfigValueFactory.fromAnyRef(TimeUnit.MINUTES.toMillis(1L)))
+        .withValue(RETRY_INTERVAL_MS, ConfigValueFactory.fromAnyRef(TimeUnit.SECONDS.toMillis(2L)));
+    Retryer<Task> retryer = RetryerFactory.newInstance(config);
+    // An "effectively final" variable for counting how many retried has been done, mostly for logging purpose.
+    final AtomicInteger counter = new AtomicInteger(0);
+
+    try {
+      return retryer.call(new Callable<Task>() {
+        @Override
+        public Task call()
+            throws Exception {
+          counter.incrementAndGet();
+          log.info(String.format("The %s time that trying create Task object", counter.get()));
+          return createTaskRunnable(workUnitState, countDownLatch);
+        }
+      });
+    } catch (Exception e) {
+      throw new RuntimeException("Execution in creating a Task-with-retry failed", e);
 
 Review comment:
   "Exception creating Task after k retries"?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services