You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/02/28 12:23:34 UTC

[GitHub] [airflow] baolsen opened a new pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

baolsen opened a new pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585
 
 
   …running
   
   ---
   Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] feluelle commented on a change in pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#discussion_r395986215
 
 

 ##########
 File path: airflow/providers/amazon/aws/operators/datasync.py
 ##########
 @@ -331,16 +372,52 @@ def _update_datasync_task(self):
         self.log.info("Updated TaskArn %s", self.task_arn)
         return self.task_arn
 
-    def _execute_datasync_task(self):
-        """Create and monitor an AWSDataSync TaskExecution for a Task."""
-        hook = self.get_hook()
+    def _wait_get_status_before_start(
+            self,
+            max_iterations=12 * 180):  # wait_interval_seconds*12*180=180 minutes by default
+        """
+        Wait until the Task can be started.
 
+        The Task can be started when its Status is not in TASK_STATUS_WAIT_BEFORE_START
+        Uses wait_interval_seconds (which is also used while waiting for TaskExecution)
+        So, max_iterations=12*180 gives 180 minutes wait by default.
+        """
+        hook = self.get_hook()
+        task_status = hook.get_task_description(self.task_arn)['Status']
+        iteration = 0
+        while task_status in self.TASK_STATUS_WAIT_BEFORE_START:
+            self.log.info(
+                'Task status is %s.'
+                ' Waiting for it to not be %s.'
+                ' Iteration %s/%s.',
+                task_status,
+                self.TASK_STATUS_WAIT_BEFORE_START,
+                iteration,
+                max_iterations)
+            time.sleep(self.wait_interval_seconds)
+            task_status = hook.get_task_description(self.task_arn)['Status']
+            iteration = iteration + 1
+            if iteration >= max_iterations:
+                break
+
+        return task_status
 
 Review comment:
   ```suggestion
           hook = self.get_hook()
           for iteration in range(max_iterations):
               task_status = hook.get_task_description(self.task_arn)['Status']
               self.log.info(
                   'Task status is %s.'
                   ' Waiting for it to not be %s.'
                   ' Iteration %s/%s.',
                   task_status,
                   self.TASK_STATUS_WAIT_BEFORE_START,
                   iteration,
                   max_iterations)
               if task_status not in self.TASK_STATUS_WAIT_BEFORE_START:
                   break
               time.sleep(self.wait_interval_seconds)
           return task_status
   ```
   WDYT?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] feluelle commented on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

Posted by GitBox <gi...@apache.org>.
feluelle commented on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#issuecomment-602034547
 
 
   > 
   > 
   > > 1. Wait for task to be able to be started - implemented in the DataSync Operator.
   > > 2. Wait for task execution to be completed - implemented in the DataSync hook. (Task execution is what you get from AWS after triggering a Task to start)
   > > 
   > > What is your recommendation to standardise these?
   > 
   > I would say that we should not wait for task to be able to start. If something has to happen before the task we should use sensor or wait for the completion in the upstream operator. For example in case of `CreateResource >> UseResource` the `CreateResource` op should check if the resource was successfully created. This is the approach we use in GCP operators/hooks.
   
   I think both options are useful to have. To the Tableau Integration I added a `blocking` parameter which in case is set to True enables a sensor task. If set to False it does not wait (=async).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] feluelle commented on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

Posted by GitBox <gi...@apache.org>.
feluelle commented on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#issuecomment-610885885
 
 
   @baolsen gentle ping :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] feluelle commented on a change in pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#discussion_r395986579
 
 

 ##########
 File path: airflow/providers/amazon/aws/operators/datasync.py
 ##########
 @@ -331,16 +372,52 @@ def _update_datasync_task(self):
         self.log.info("Updated TaskArn %s", self.task_arn)
         return self.task_arn
 
-    def _execute_datasync_task(self):
-        """Create and monitor an AWSDataSync TaskExecution for a Task."""
-        hook = self.get_hook()
+    def _wait_get_status_before_start(
+            self,
+            max_iterations=12 * 180):  # wait_interval_seconds*12*180=180 minutes by default
 
 Review comment:
   ```suggestion
               max_iterations : int = 12 * 180) -> str:
   ```
   You already have that comment in the docs below. Even better it would be to add `:param` and add the comment there.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] feluelle commented on a change in pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#discussion_r395985184
 
 

 ##########
 File path: airflow/providers/amazon/aws/operators/datasync.py
 ##########
 @@ -208,12 +217,41 @@ def execute(self, context):
 
         self.log.info("Using DataSync TaskArn %s", self.task_arn)
 
-        # Update the DataSync Task
+        # Update the DataSync Task definition
         if self.update_task_kwargs:
             self._update_datasync_task()
 
-        # Execute the DataSync Task
-        self._execute_datasync_task()
+        # Wait for the Task to be in a valid state to Start
+        self.task_status = self._wait_get_status_before_start()
+
+        self.log.info('Task status is %s.', self.task_status)
+        if self.task_status in self.TASK_STATUS_START:
+            self.log.info(
+                'The Task will be started because its status is in %s.',
+                self.TASK_STATUS_START)
+            # Start the DataSync Task
+            self._start_datasync_task()
+        elif self.task_status in self.TASK_STATUS_SKIP_START:
+            self.log.info(
+                'The Task will NOT be started because its status is in %s.',
+                self.TASK_STATUS_SKIP_START)
+            if not self.task_execution_arn:
+                task_description = self.get_hook().get_task_description(self.task_arn)
+                if 'CurrentTaskExecutionArn' in task_description:
+                    self.task_execution_arn = task_description['CurrentTaskExecutionArn']
+                else:
+                    raise AirflowException(
+                        'Starting the Task was skipped,'
+                        ' but no CurrentTaskExecutionArn was found.')
+        elif self.task_status in self.TASK_STATUS_FAIL:
+            raise AirflowException(
+                'Task cannot be started because its status is in %s.'
+                % self.TASK_STATUS_FAIL
+            )
+        else:
+            raise AirflowException('Unexpected task status %s.' % self.task_status)
 
 Review comment:
   ```suggestion
               raise AirflowException(f'Unexpected task status {self.task_status}')
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#issuecomment-592522118
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7585?src=pr&el=h1) Report
   > Merging [#7585](https://codecov.io/gh/apache/airflow/pull/7585?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/bbfc18e7e01fdd3a263ddb0b2b4cce8f2c06f732?src=pr&el=desc) will **decrease** coverage by `0.32%`.
   > The diff coverage is `23.25%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7585/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7585?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7585      +/-   ##
   ==========================================
   - Coverage   86.85%   86.52%   -0.33%     
   ==========================================
     Files         896      896              
     Lines       42622    42657      +35     
   ==========================================
   - Hits        37021    36911     -110     
   - Misses       5601     5746     +145
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7585?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/providers/amazon/aws/hooks/datasync.py](https://codecov.io/gh/apache/airflow/pull/7585/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYW1hem9uL2F3cy9ob29rcy9kYXRhc3luYy5weQ==) | `16.78% <0%> (ø)` | :arrow_up: |
   | [airflow/providers/amazon/aws/operators/datasync.py](https://codecov.io/gh/apache/airflow/pull/7585/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYW1hem9uL2F3cy9vcGVyYXRvcnMvZGF0YXN5bmMucHk=) | `32.63% <23.8%> (-1.57%)` | :arrow_down: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7585/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7585/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7585/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
   | [...viders/cncf/kubernetes/operators/kubernetes\_pod.py](https://codecov.io/gh/apache/airflow/pull/7585/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZC5weQ==) | `69.69% <0%> (-25.26%)` | :arrow_down: |
   | [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7585/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7585?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7585?src=pr&el=footer). Last update [bbfc18e...f7c5571](https://codecov.io/gh/apache/airflow/pull/7585?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#issuecomment-599725428
 
 
   > 1. Wait for task to be able to be started - implemented in the DataSync Operator.
   > 2. Wait for task execution to be completed - implemented in the DataSync hook. (Task execution is what you get from AWS after triggering a Task to start)
   > 
   > What is your recommendation to standardise these?
   
   I would say that we should not wait for task to be able to start. If something has to happen before the task we should use sensor or wait for the completion in the upstream operator. For example in case of `CreateResource >> UseResource` the `CreateResource` op should check if the resource was successfully created. This is the approach we use in GCP operators/hooks. 
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] feluelle commented on a change in pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#discussion_r395986631
 
 

 ##########
 File path: airflow/providers/amazon/aws/operators/datasync.py
 ##########
 @@ -331,16 +372,52 @@ def _update_datasync_task(self):
         self.log.info("Updated TaskArn %s", self.task_arn)
         return self.task_arn
 
-    def _execute_datasync_task(self):
-        """Create and monitor an AWSDataSync TaskExecution for a Task."""
-        hook = self.get_hook()
+    def _wait_get_status_before_start(
+            self,
+            max_iterations=12 * 180):  # wait_interval_seconds*12*180=180 minutes by default
+        """
+        Wait until the Task can be started.
 
+        The Task can be started when its Status is not in TASK_STATUS_WAIT_BEFORE_START
+        Uses wait_interval_seconds (which is also used while waiting for TaskExecution)
+        So, max_iterations=12*180 gives 180 minutes wait by default.
+        """
 
 Review comment:
   Please also add `:returns:` here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] baolsen commented on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

Posted by GitBox <gi...@apache.org>.
baolsen commented on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#issuecomment-599491925
 
 
   Hi @feluelle / @nuclearpinguin , would appreciate a review :)
   
   One thing I am aware of - currently there are 2 wait loops:
   1. Wait for task to be able to be started - implemented in the DataSync Operator. 
   2. Wait for task execution to be completed - implemented in the DataSync hook. (Task execution is what you get from AWS after triggering a Task to start)
   
   What is your recommendation to standardise these? 
   I have no preference... Just don't want a Sensor as the Operator is currently self-contained in terms of creating/deleting/running tasks. I'd like to move both wait loops either into the hook or the operator, not sure where is typically the best place.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] feluelle edited a comment on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

Posted by GitBox <gi...@apache.org>.
feluelle edited a comment on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#issuecomment-602034547
 
 
   > 
   > 
   > > 1. Wait for task to be able to be started - implemented in the DataSync Operator.
   > > 2. Wait for task execution to be completed - implemented in the DataSync hook. (Task execution is what you get from AWS after triggering a Task to start)
   > > 
   > > What is your recommendation to standardise these?
   > 
   > I would say that we should not wait for task to be able to start. If something has to happen before the task we should use sensor or wait for the completion in the upstream operator. For example in case of `CreateResource >> UseResource` the `CreateResource` op should check if the resource was successfully created. This is the approach we use in GCP operators/hooks.
   
   I think both options are useful to have. To https://github.com/apache/airflow/pull/7410 (Tableau Integration) I added a `blocking` parameter which in case is set to True enables a sensor task. If set to False it does not wait (=async).
   See: https://github.com/apache/airflow/pull/7410/files#diff-4f65033904d8c2b6c064e959d218d513R71
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] feluelle commented on a change in pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#discussion_r395985146
 
 

 ##########
 File path: airflow/providers/amazon/aws/operators/datasync.py
 ##########
 @@ -208,12 +217,41 @@ def execute(self, context):
 
         self.log.info("Using DataSync TaskArn %s", self.task_arn)
 
-        # Update the DataSync Task
+        # Update the DataSync Task definition
         if self.update_task_kwargs:
             self._update_datasync_task()
 
-        # Execute the DataSync Task
-        self._execute_datasync_task()
+        # Wait for the Task to be in a valid state to Start
+        self.task_status = self._wait_get_status_before_start()
+
+        self.log.info('Task status is %s.', self.task_status)
+        if self.task_status in self.TASK_STATUS_START:
+            self.log.info(
+                'The Task will be started because its status is in %s.',
+                self.TASK_STATUS_START)
+            # Start the DataSync Task
+            self._start_datasync_task()
+        elif self.task_status in self.TASK_STATUS_SKIP_START:
+            self.log.info(
+                'The Task will NOT be started because its status is in %s.',
+                self.TASK_STATUS_SKIP_START)
+            if not self.task_execution_arn:
+                task_description = self.get_hook().get_task_description(self.task_arn)
+                if 'CurrentTaskExecutionArn' in task_description:
+                    self.task_execution_arn = task_description['CurrentTaskExecutionArn']
+                else:
+                    raise AirflowException(
+                        'Starting the Task was skipped,'
+                        ' but no CurrentTaskExecutionArn was found.')
+        elif self.task_status in self.TASK_STATUS_FAIL:
+            raise AirflowException(
+                'Task cannot be started because its status is in %s.'
+                % self.TASK_STATUS_FAIL
+            )
 
 Review comment:
   ```suggestion
               raise AirflowException(
                   f'Task cannot be started because its status is in {self.TASK_STATUS_FAIL}.'
               )
   ```
   Please use `f-string`s :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] feluelle commented on a change in pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#discussion_r395984966
 
 

 ##########
 File path: airflow/providers/amazon/aws/operators/datasync.py
 ##########
 @@ -208,12 +217,41 @@ def execute(self, context):
 
         self.log.info("Using DataSync TaskArn %s", self.task_arn)
 
-        # Update the DataSync Task
+        # Update the DataSync Task definition
         if self.update_task_kwargs:
             self._update_datasync_task()
 
-        # Execute the DataSync Task
-        self._execute_datasync_task()
+        # Wait for the Task to be in a valid state to Start
+        self.task_status = self._wait_get_status_before_start()
+
+        self.log.info('Task status is %s.', self.task_status)
+        if self.task_status in self.TASK_STATUS_START:
+            self.log.info(
+                'The Task will be started because its status is in %s.',
+                self.TASK_STATUS_START)
+            # Start the DataSync Task
 
 Review comment:
   ```suggestion
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

Posted by GitBox <gi...@apache.org>.
nuclearpinguin commented on a change in pull request #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#discussion_r393266617
 
 

 ##########
 File path: airflow/providers/amazon/aws/operators/datasync.py
 ##########
 @@ -331,16 +372,52 @@ def _update_datasync_task(self):
         self.log.info("Updated TaskArn %s", self.task_arn)
         return self.task_arn
 
-    def _execute_datasync_task(self):
-        """Create and monitor an AWSDataSync TaskExecution for a Task."""
-        hook = self.get_hook()
+    def _wait_get_status_before_start(
 
 Review comment:
   ```suggestion
       def _wait_for_status(
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] baolsen edited a comment on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

Posted by GitBox <gi...@apache.org>.
baolsen edited a comment on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#issuecomment-599491925
 
 
   Hi @feluelle / @nuclearpinguin , would appreciate a review :)
   
   One thing I am aware of - currently there are 2 wait loops:
   1. Wait for task to be able to be started - implemented in the DataSync Operator. 
   2. Wait for task execution to be completed - implemented in the DataSync hook. (Task execution is what you get from AWS after triggering a Task to start)
   
   What is your recommendation to standardise these? 
   I have no preference... Just don't want a Sensor because the Operator is currently self-contained in terms of creating/deleting/running tasks. I'd like to move both wait loops either into the hook or the operator, not sure where is typically the best place.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io commented on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#issuecomment-592522118
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7585?src=pr&el=h1) Report
   > Merging [#7585](https://codecov.io/gh/apache/airflow/pull/7585?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/bbfc18e7e01fdd3a263ddb0b2b4cce8f2c06f732?src=pr&el=desc) will **decrease** coverage by `0.32%`.
   > The diff coverage is `23.25%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7585/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7585?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7585      +/-   ##
   ==========================================
   - Coverage   86.85%   86.52%   -0.33%     
   ==========================================
     Files         896      896              
     Lines       42622    42657      +35     
   ==========================================
   - Hits        37021    36911     -110     
   - Misses       5601     5746     +145
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7585?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/providers/amazon/aws/hooks/datasync.py](https://codecov.io/gh/apache/airflow/pull/7585/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYW1hem9uL2F3cy9ob29rcy9kYXRhc3luYy5weQ==) | `16.78% <0%> (ø)` | :arrow_up: |
   | [airflow/providers/amazon/aws/operators/datasync.py](https://codecov.io/gh/apache/airflow/pull/7585/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYW1hem9uL2F3cy9vcGVyYXRvcnMvZGF0YXN5bmMucHk=) | `32.63% <23.8%> (-1.57%)` | :arrow_down: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7585/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7585/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7585/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
   | [...viders/cncf/kubernetes/operators/kubernetes\_pod.py](https://codecov.io/gh/apache/airflow/pull/7585/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZC5weQ==) | `69.69% <0%> (-25.26%)` | :arrow_down: |
   | [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7585/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7585?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7585?src=pr&el=footer). Last update [bbfc18e...f7c5571](https://codecov.io/gh/apache/airflow/pull/7585?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] feluelle edited a comment on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …

Posted by GitBox <gi...@apache.org>.
feluelle edited a comment on issue #7585: [AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …
URL: https://github.com/apache/airflow/pull/7585#issuecomment-602034547
 
 
   > 
   > 
   > > 1. Wait for task to be able to be started - implemented in the DataSync Operator.
   > > 2. Wait for task execution to be completed - implemented in the DataSync hook. (Task execution is what you get from AWS after triggering a Task to start)
   > > 
   > > What is your recommendation to standardise these?
   > 
   > I would say that we should not wait for task to be able to start. If something has to happen before the task we should use sensor or wait for the completion in the upstream operator. For example in case of `CreateResource >> UseResource` the `CreateResource` op should check if the resource was successfully created. This is the approach we use in GCP operators/hooks.
   
   I think both options are useful to have. To the Tableau Integration I added a `blocking` parameter which in case is set to True enables a sensor task. If set to False it does not wait (=async).
   See: https://github.com/apache/airflow/pull/7410/files#diff-4f65033904d8c2b6c064e959d218d513R71

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services