You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/11/04 19:25:06 UTC

[GitHub] [airflow] baxievski opened a new pull request #12096: add xcom push for ECSOperator

baxievski opened a new pull request #12096:
URL: https://github.com/apache/airflow/pull/12096


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#discussion_r553873793



##########
File path: UPDATING.md
##########
@@ -742,6 +742,7 @@ The following operators were affected:
 * BashOperator (Not backwards compatible)
 * DockerOperator (Not backwards compatible)
 * SimpleHttpOperator (Not backwards compatible)
+* ECSOperator (Not backwards compatible)

Review comment:
       Oh @baxievski I think if we put this line here it will be confusing since version 2.0.0 has already been released. And with this in mind if you would add it to the next release > 2.0.0, it is no longer a breaking change, is it?
   
   So I would suggest we remove it completely.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baxievski commented on a change in pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
baxievski commented on a change in pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#discussion_r517614042



##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -260,6 +265,24 @@ def _wait_for_task_ended(self) -> None:
         waiter.config.max_attempts = sys.maxsize  # timeout is managed by airflow
         waiter.wait(cluster=self.cluster, tasks=[self.arn])
 
+    def _cloudwatch_log_events(self):
+        if not self.awslogs_group:
+            return
+
+        if not self.awslogs_stream_prefix:
+            return
+
+        task_id = self.arn.split("/")[-1]
+        stream_name = "{}/{}".format(self.awslogs_stream_prefix, task_id)

Review comment:
       Fixed `stream_name`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#discussion_r553873793



##########
File path: UPDATING.md
##########
@@ -742,6 +742,7 @@ The following operators were affected:
 * BashOperator (Not backwards compatible)
 * DockerOperator (Not backwards compatible)
 * SimpleHttpOperator (Not backwards compatible)
+* ECSOperator (Not backwards compatible)

Review comment:
       Oh @baxievski I think if we put this line here it will be confusing since version 2.0.0 has already been released. And with this in mind if you would add it to the next release > 2.0.0, it is no longer a breaking change, is it?
   
   So I would suggest we remove it completely.
   

##########
File path: UPDATING.md
##########
@@ -742,6 +742,7 @@ The following operators were affected:
 * BashOperator (Not backwards compatible)
 * DockerOperator (Not backwards compatible)
 * SimpleHttpOperator (Not backwards compatible)
+* ECSOperator (Not backwards compatible)

Review comment:
       Oh @baxievski I think if we put this line here it will be confusing since version 2.0.0 has already been released. And with this in mind if you would add it to the next release > 2.0.0, it is no longer a breaking change, is it?
   
   So I would suggest we remove it completely.
   
   ```suggestion
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baxievski commented on a change in pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
baxievski commented on a change in pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#discussion_r517611582



##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -260,6 +265,24 @@ def _wait_for_task_ended(self) -> None:
         waiter.config.max_attempts = sys.maxsize  # timeout is managed by airflow
         waiter.wait(cluster=self.cluster, tasks=[self.arn])
 
+    def _cloudwatch_log_events(self):
+        if not self.awslogs_group:
+            return
+
+        if not self.awslogs_stream_prefix:
+            return
+
+        task_id = self.arn.split("/")[-1]
+        stream_name = "{}/{}".format(self.awslogs_stream_prefix, task_id)
+
+        return self.get_logs_hook().get_log_events(self.awslogs_group, stream_name)
+
+    def _last_log_event(self):
+        log_events = self._cloudwatch_log_events()
+        if log_events is not None:
+            return deque(log_events, maxlen=1).pop()["message"]

Review comment:
       It's a more efficient way to get the last item from a generator - no need to iterate over every item from the generator just to get to the last one.
   
   It might make a difference in case of lots of events.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#discussion_r551187666



##########
File path: tests/providers/amazon/aws/operators/test_ecs.py
##########
@@ -314,3 +314,22 @@ def test_reattach_successful(self, launch_type, tags, start_mock, check_mock, wa
         self.assertEqual(
             self.ecs.arn, 'arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55'
         )
+
+    @mock.patch.object(
+        ECSOperator, '_cloudwatch_log_events', return_value=[{"message": "Log output", "timestamp": 1000}]
+    )
+    def test_execute_xcom_with_log(self, _):
+        self.ecs.do_xcom_push = True
+        self.assertEqual(self.ecs.execute(None), "Log output")

Review comment:
       You want to make sure that the _real output_ (`log_message`) is equal to the one of the mocked one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-751242166


   [The Workflow run](https://github.com/apache/airflow/actions/runs/444020240) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-757763493


   Awesome work, congrats on your first merged pull request!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-754128753


   [The Workflow run](https://github.com/apache/airflow/actions/runs/461622546) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-751241948


   [The Workflow run](https://github.com/apache/airflow/actions/runs/444018625) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-735635311


   [The Workflow run](https://github.com/apache/airflow/actions/runs/391152628) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-721940226


   [The Workflow run](https://github.com/apache/airflow/actions/runs/346202220) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-721940681


   [The Workflow run](https://github.com/apache/airflow/actions/runs/346202911) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
feluelle commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-747976100


   > > Could you please add tests.
   > 
   > Sure, I'll need to understand how the operator is currently tested.
   
   You can find the tests here: https://github.com/apache/airflow/blob/master/tests/providers/amazon/aws/operators/test_ecs.py
   Just add one that tests that the executes returns the data you expect. You can use `unittest.mock` to mock the ecs (boto3) under the hood.
   
   > > There are many branches returning `None`. Wouldn't it be possible and more clearer if we combine them?
   > 
   > Yes, please let me know if you think it's ok now.
   
   Yes, looks good now :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
feluelle commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-755450602


   @baxievski please rebase the error you see in pylint is unrelated to your code and has been fixed. Hopefully everything will be green after rebase :)
   
   @XD-DENG please re-review, you were requesting changes which I think are solved now. :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baxievski commented on a change in pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
baxievski commented on a change in pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#discussion_r532406171



##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -260,6 +267,27 @@ def _wait_for_task_ended(self) -> None:
         waiter.config.max_attempts = sys.maxsize  # timeout is managed by airflow
         waiter.wait(cluster=self.cluster, tasks=[self.arn])
 
+        return
+
+    def _cloudwatch_log_events(self):
+        if not self.awslogs_group:
+            return None
+
+        if not self.awslogs_stream_prefix:
+            return None

Review comment:
       Good point, I am not sure if you had something like this in mind:
   
   ```py
               if self._aws_logs_enabled():
                   # do stuff
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baxievski commented on a change in pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
baxievski commented on a change in pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#discussion_r532406171



##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -260,6 +267,27 @@ def _wait_for_task_ended(self) -> None:
         waiter.config.max_attempts = sys.maxsize  # timeout is managed by airflow
         waiter.wait(cluster=self.cluster, tasks=[self.arn])
 
+        return
+
+    def _cloudwatch_log_events(self):
+        if not self.awslogs_group:
+            return None
+
+        if not self.awslogs_stream_prefix:
+            return None

Review comment:
       Good point, I am not sure if you had something like this in mind:
   
   ```py
               if self._aws_logs_enabled():
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baxievski commented on a change in pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
baxievski commented on a change in pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#discussion_r532406171



##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -260,6 +267,27 @@ def _wait_for_task_ended(self) -> None:
         waiter.config.max_attempts = sys.maxsize  # timeout is managed by airflow
         waiter.wait(cluster=self.cluster, tasks=[self.arn])
 
+        return
+
+    def _cloudwatch_log_events(self):
+        if not self.awslogs_group:
+            return None
+
+        if not self.awslogs_stream_prefix:
+            return None

Review comment:
       Good point, I am not sure if you had something like this in mind: `if self._aws_logs_enabled():`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baxievski commented on a change in pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
baxievski commented on a change in pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#discussion_r532406171



##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -260,6 +267,27 @@ def _wait_for_task_ended(self) -> None:
         waiter.config.max_attempts = sys.maxsize  # timeout is managed by airflow
         waiter.wait(cluster=self.cluster, tasks=[self.arn])
 
+        return
+
+    def _cloudwatch_log_events(self):
+        if not self.awslogs_group:
+            return None
+
+        if not self.awslogs_stream_prefix:
+            return None

Review comment:
       Good point, I am not sure if you had something like this in mind:
   
   ```py
           if self._aws_logs_enabled():
               # do stuff
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-755495518


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest master or amend the last commit of the PR, and push it with --force-with-lease.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on a change in pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on a change in pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#discussion_r517589217



##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -260,6 +265,24 @@ def _wait_for_task_ended(self) -> None:
         waiter.config.max_attempts = sys.maxsize  # timeout is managed by airflow
         waiter.wait(cluster=self.cluster, tasks=[self.arn])
 
+    def _cloudwatch_log_events(self):
+        if not self.awslogs_group:
+            return
+
+        if not self.awslogs_stream_prefix:
+            return
+
+        task_id = self.arn.split("/")[-1]
+        stream_name = "{}/{}".format(self.awslogs_stream_prefix, task_id)
+
+        return self.get_logs_hook().get_log_events(self.awslogs_group, stream_name)
+
+    def _last_log_event(self):
+        log_events = self._cloudwatch_log_events()
+        if log_events is not None:
+            return deque(log_events, maxlen=1).pop()["message"]

Review comment:
       May you please clarify why `deque` is needed here? Thanks.

##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -260,6 +265,24 @@ def _wait_for_task_ended(self) -> None:
         waiter.config.max_attempts = sys.maxsize  # timeout is managed by airflow
         waiter.wait(cluster=self.cluster, tasks=[self.arn])
 
+    def _cloudwatch_log_events(self):
+        if not self.awslogs_group:
+            return
+
+        if not self.awslogs_stream_prefix:
+            return
+
+        task_id = self.arn.split("/")[-1]
+        stream_name = "{}/{}".format(self.awslogs_stream_prefix, task_id)

Review comment:
       We have started to enforce using f-string, and this `.format()` usage should fail the CI.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baxievski commented on a change in pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
baxievski commented on a change in pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#discussion_r554763896



##########
File path: UPDATING.md
##########
@@ -742,6 +742,7 @@ The following operators were affected:
 * BashOperator (Not backwards compatible)
 * DockerOperator (Not backwards compatible)
 * SimpleHttpOperator (Not backwards compatible)
+* ECSOperator (Not backwards compatible)

Review comment:
       @feluelle right, forgot that this is not really a breaking change since 2.0.0 was released in the meantime. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] XD-DENG commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
XD-DENG commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-755473398


   > @baxievski please rebase the error you see in pylint is unrelated to your code and has been fixed. Hopefully everything will be green after rebase :)
   > 
   > @XD-DENG please re-review, you were requesting changes which I think are solved now. :)
   
   Thanks @feluelle and @baxievski .
   
   Yep my earlier comments have been addressed. I may not have bandwidth to re-review everything now, but please feel free to merge if everything looks good to you @feluelle πŸ‘ I have dismissed my change request. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baxievski commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
baxievski commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-751268631


   I added tests for return value of `ECSOperator.execute()`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baxievski edited a comment on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
baxievski edited a comment on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-751268631


   I added tests for the return value of `ECSOperator.execute()`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-751241663


   [The Workflow run](https://github.com/apache/airflow/actions/runs/444015796) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#discussion_r530843994



##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -260,6 +267,27 @@ def _wait_for_task_ended(self) -> None:
         waiter.config.max_attempts = sys.maxsize  # timeout is managed by airflow
         waiter.wait(cluster=self.cluster, tasks=[self.arn])
 
+        return
+
+    def _cloudwatch_log_events(self):
+        if not self.awslogs_group:
+            return None
+
+        if not self.awslogs_stream_prefix:
+            return None

Review comment:
       Maybe we want to pull the "requirements" out of the method and do sth like: if requirement than call function _else_ return None. WDYT?

##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -268,11 +296,9 @@ def _check_success_task(self) -> None:
         self.log.info('ECS Task stopped, check status: %s', response)
 
         # Get logs from CloudWatch if the awslogs log driver was used
-        if self.awslogs_group and self.awslogs_stream_prefix:
-            self.log.info('ECS Task logs output:')
-            task_id = self.arn.split("/")[-1]
-            stream_name = f"{self.awslogs_stream_prefix}/{task_id}"
-            for event in self.get_logs_hook().get_log_events(self.awslogs_group, stream_name):
+        log_events = self._cloudwatch_log_events()
+        if log_events is not None:
+            for event in log_events:

Review comment:
       Woudn't it make more sense to return an empty list so you don't have to check if not none? So the loop will go over 0 items instead?
   
   Or just do `log_events = self._cloudwatch_log_events() or []` (but I would suggest to do this in the function)
   
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baxievski commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
baxievski commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-756368078


   @feluelle I rebased and fixed my tests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baxievski commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
baxievski commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-748302816


   > > > Could you please add tests.
   > > 
   > > Sure, I'll need to understand how the operator is currently tested.
   > 
   > You can find the tests here: https://github.com/apache/airflow/blob/master/tests/providers/amazon/aws/operators/test_ecs.py
   > Just add one that tests that the executes returns the data you expect. You can use `unittest.mock` to mock the ecs (boto3) under the hood.
   > 
   > > > There are many branches returning `None`. Wouldn't it be possible and more clearer if we combine them?
   > > 
   > > Yes, please let me know if you think it's ok now.
   > 
   > Yes, looks good now :)
   
   I didn't abandon this pr, just finding it really difficult to find some free time lately. :) Hopefully this weekend... 
   
   Thanks, your explanation is really helpful.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baxievski commented on a change in pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
baxievski commented on a change in pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#discussion_r532407356



##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -268,11 +296,9 @@ def _check_success_task(self) -> None:
         self.log.info('ECS Task stopped, check status: %s', response)
 
         # Get logs from CloudWatch if the awslogs log driver was used
-        if self.awslogs_group and self.awslogs_stream_prefix:
-            self.log.info('ECS Task logs output:')
-            task_id = self.arn.split("/")[-1]
-            stream_name = f"{self.awslogs_stream_prefix}/{task_id}"
-            for event in self.get_logs_hook().get_log_events(self.awslogs_group, stream_name):
+        log_events = self._cloudwatch_log_events()
+        if log_events is not None:
+            for event in log_events:

Review comment:
       Changed - great point, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baxievski commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
baxievski commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-735620876


   > Could you please add tests. 
   
   Sure, I'll need to understand how the operator is currently tested.
   
   > There are many branches returning `None`. Wouldn't it be possible and more clearer if we combine them?
   
   Yes, please let me know if you think it's ok now.
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#discussion_r553873793



##########
File path: UPDATING.md
##########
@@ -742,6 +742,7 @@ The following operators were affected:
 * BashOperator (Not backwards compatible)
 * DockerOperator (Not backwards compatible)
 * SimpleHttpOperator (Not backwards compatible)
+* ECSOperator (Not backwards compatible)

Review comment:
       Oh @baxievski I think if we put this line here it will be confusing since version 2.0.0 has already been released. And with this in mind if you would add it to the next release > 2.0.0, it is no longer a breaking change, is it?
   
   So I would suggest we remove it completely.
   
   ```suggestion
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle commented on a change in pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
feluelle commented on a change in pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#discussion_r551186185



##########
File path: tests/providers/amazon/aws/operators/test_ecs.py
##########
@@ -314,3 +314,22 @@ def test_reattach_successful(self, launch_type, tags, start_mock, check_mock, wa
         self.assertEqual(
             self.ecs.arn, 'arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55'
         )
+
+    @mock.patch.object(
+        ECSOperator, '_cloudwatch_log_events', return_value=[{"message": "Log output", "timestamp": 1000}]
+    )
+    def test_execute_xcom_with_log(self, _):
+        self.ecs.do_xcom_push = True
+        self.assertEqual(self.ecs.execute(None), "Log output")

Review comment:
       ```suggestion
       def test_execute_xcom_with_log(self, mock_cloudwatch_log_events):
           self.ecs.do_xcom_push = True
           log_message = self.ecs.execute(context=None)
           assert log_message == mock_cloudwatch_log_events.return_value[0]['message']
   ```
   WDYT? :)

##########
File path: tests/providers/amazon/aws/operators/test_ecs.py
##########
@@ -314,3 +314,22 @@ def test_reattach_successful(self, launch_type, tags, start_mock, check_mock, wa
         self.assertEqual(
             self.ecs.arn, 'arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55'
         )
+
+    @mock.patch.object(
+        ECSOperator, '_cloudwatch_log_events', return_value=[{"message": "Log output", "timestamp": 1000}]
+    )
+    def test_execute_xcom_with_log(self, _):
+        self.ecs.do_xcom_push = True
+        self.assertEqual(self.ecs.execute(None), "Log output")
+
+    @mock.patch.object(ECSOperator, '_cloudwatch_log_events', return_value=[])
+    def test_execute_xcom_with_no_log(self, _):
+        self.ecs.do_xcom_push = True
+        self.assertEqual(self.ecs.execute(None), None)

Review comment:
       Same here.

##########
File path: tests/providers/amazon/aws/operators/test_ecs.py
##########
@@ -314,3 +314,22 @@ def test_reattach_successful(self, launch_type, tags, start_mock, check_mock, wa
         self.assertEqual(
             self.ecs.arn, 'arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55'
         )
+
+    @mock.patch.object(
+        ECSOperator, '_cloudwatch_log_events', return_value=[{"message": "Log output", "timestamp": 1000}]
+    )
+    def test_execute_xcom_with_log(self, _):
+        self.ecs.do_xcom_push = True
+        self.assertEqual(self.ecs.execute(None), "Log output")
+
+    @mock.patch.object(ECSOperator, '_cloudwatch_log_events', return_value=[])
+    def test_execute_xcom_with_no_log(self, _):
+        self.ecs.do_xcom_push = True
+        self.assertEqual(self.ecs.execute(None), None)
+
+    @mock.patch.object(
+        ECSOperator, '_cloudwatch_log_events', return_value=[{"message": "Log output", "timestamp": 1000}]
+    )
+    def test_execute_xcom_disabled(self, _):
+        self.ecs.do_xcom_push = False
+        self.assertEqual(self.ecs.execute(None), None)

Review comment:
       And here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baxievski commented on a change in pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
baxievski commented on a change in pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#discussion_r532406171



##########
File path: airflow/providers/amazon/aws/operators/ecs.py
##########
@@ -260,6 +267,27 @@ def _wait_for_task_ended(self) -> None:
         waiter.config.max_attempts = sys.maxsize  # timeout is managed by airflow
         waiter.wait(cluster=self.cluster, tasks=[self.arn])
 
+        return
+
+    def _cloudwatch_log_events(self):
+        if not self.awslogs_group:
+            return None
+
+        if not self.awslogs_stream_prefix:
+            return None

Review comment:
       Good point, I am not sure if you had something like this in mind:
   
   ```py
               if self._aws_logs_enabled():
                   # do stuff
               else:
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] feluelle merged pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
feluelle merged pull request #12096:
URL: https://github.com/apache/airflow/pull/12096


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] baxievski commented on a change in pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
baxievski commented on a change in pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#discussion_r551440334



##########
File path: tests/providers/amazon/aws/operators/test_ecs.py
##########
@@ -314,3 +314,22 @@ def test_reattach_successful(self, launch_type, tags, start_mock, check_mock, wa
         self.assertEqual(
             self.ecs.arn, 'arn:aws:ecs:us-east-1:012345678910:task/d8c67b3c-ac87-4ffe-a847-4785bc3a8b55'
         )
+
+    @mock.patch.object(
+        ECSOperator, '_cloudwatch_log_events', return_value=[{"message": "Log output", "timestamp": 1000}]
+    )
+    def test_execute_xcom_with_log(self, _):
+        self.ecs.do_xcom_push = True
+        self.assertEqual(self.ecs.execute(None), "Log output")

Review comment:
       > LGTM, just a minor "improvement". WDYT? :)
   
   Yeah, definitely an improvement - thanks!
   
   I avoided the `log_message` assignment since it's only used once.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-721927417


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, pylint and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/master/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/master/docs/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/master/BREEZE.rst) for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better πŸš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12096: add xcom push for ECSOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12096:
URL: https://github.com/apache/airflow/pull/12096#issuecomment-757691161


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest master or amend the last commit of the PR, and push it with --force-with-lease.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org