You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/04/22 04:48:11 UTC

[GitHub] [airflow] lokeshlal opened a new pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

lokeshlal opened a new pull request #8509:
URL: https://github.com/apache/airflow/pull/8509


   Added failed_states for ExternalTaskSensor to avoid waiting for the failure scenarios till timeout value
   
   - Added failed_states property in ExternalTaskSensor (defaulted to None)
   - Added corresponding checks against the failed_states and allowed states, such as duplicate values and valid values
   - Raise exception from poke method when the task or dag reaches the failed states
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-619192000


   > Thank you @potiuk. However it failed again while pushing Prod image and CI image.
   
   ah.. those should be skipped in PR from.fork ... Will.look at it tomorrow 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lokeshlal commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
lokeshlal commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-624403877


   Hi @potiuk @mik-laj seems like error is not related to code pushed for this PR. Could you please help in restarting the job, if it helps in running the checks. 
   Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj edited a comment on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
mik-laj edited a comment on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-624063425


   @lokeshlal Yes. This is the file. It would be very helpful to add an example that describes the new parameter.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lokeshlal commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
lokeshlal commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-620384248


   Requesting review of this PR


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-618906849


   Python 3.6.10 was released 10 hours ago. I am now working on a final fix to also handle this case. That was mostly about timeout settings. I will push latest images quickly now to fix it and will re-run your build.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-618909136


   This is all a by-product of our emergency move. As all "move-fast" things it also tends to "break-things" (@dimberman, @turbaszek -> just a learning for the three of us that sometimes "fast moves" that seem to take hours, require days of fixing afterwards. So we should be possibly a bit more patient next time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-619782161


   Yeah. I think I solved all the problems over the weekend :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lokeshlal commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
lokeshlal commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-623240562


   Hello @potiuk Could you please help in reviewing and merging this PR. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-625253056


   Thanks @lokeshlal !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lokeshlal commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
lokeshlal commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-619781319


   Hello @potiuk , Do I have to rebase and push the code again.
   
   Thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lokeshlal commented on issue #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
lokeshlal commented on issue #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-618186845


   Hello,
   
   Build is failing in test Coe:Pg_X.X_ for following test 
   `TestKillChildProcessesByPids.test_should_kill_process`
   
   Could someone please help in identifying the issue.
   
   Thanks,
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-624063425


   @lokeshlal Yes. This is the document. It would be very helpful to add an example that describes the new parameter.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lokeshlal commented on issue #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
lokeshlal commented on issue #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-618525476


   The build status still says it failed. However all checks have passed.
   
   Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lokeshlal commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
lokeshlal commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-624145541


   Done. made the changes in the doc as suggested. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lokeshlal commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
lokeshlal commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-625224530


   Hello @potiuk finally it succeded 👍 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lokeshlal commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
lokeshlal commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-622374574


   Can someone please look into this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lokeshlal commented on issue #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
lokeshlal commented on issue #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-618456054


   hello, could someone please restart following travis job
   
   https://travis-ci.org/github/apache/airflow/jobs/678565944
   
   Thanks,


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] randr97 commented on a change in pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
randr97 commented on a change in pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#discussion_r414575632



##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -68,23 +70,35 @@ def __init__(self,
                  external_dag_id,
                  external_task_id=None,
                  allowed_states=None,
+                 failed_states=None,
                  execution_delta=None,
                  execution_date_fn=None,
                  check_existence=False,
                  *args,
                  **kwargs):
         super().__init__(*args, **kwargs)
         self.allowed_states = allowed_states or [State.SUCCESS]
+        self.failed_states = failed_states or []
+
+        total_states = []
+        total_states.extend(self.allowed_states)
+        total_states.extend(self.failed_states)
+
+        if len(list(set(total_states))) < (len(self.failed_states) + len(self.allowed_states)):
+            raise AirflowException("Duplicate values provided as allowed "
+                                   "`{}` and failed states `{}`"
+                                   .format(self.allowed_states, self.failed_states))
+
         if external_task_id:
-            if not set(self.allowed_states) <= set(State.task_states):
+            if not set(total_states) <= set(State.task_states):
                 raise ValueError(
-                    'Valid values for `allowed_states` '
+                    'Valid values for `allowed_states` and `failed_states` '
                     'when `external_task_id` is not `None`: {}'.format(State.task_states)
                 )
         else:
-            if not set(self.allowed_states) <= set(State.dag_states):
+            if not set(total_states) <= set(State.dag_states):
                 raise ValueError(
-                    'Valid values for `allowed_states` '
+                    'Valid values for `allowed_states` and `failed_states` '
                     'when `external_task_id` is `None`: {}'.format(State.dag_states)
                 )

Review comment:
       `error_template = 'Valid values for `allowed_states` and `failed_states` '\
                    'when `external_task_id` is not `None`: {}'
   if external_task_id and not set(total_states) <= set(State.task_states):
       raise ValueError(error_template.format(State.task_states))
   elif not set(total_states) <= set(State.dag_states):
       raise ValueError(error_template.format(State.dag_states))`
   Jus my personal view :P




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-624550882


   @lokeshlal please rebase to the latest master. This should fix the problems.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jward-bw commented on a change in pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
jward-bw commented on a change in pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#discussion_r412825231



##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -141,28 +153,58 @@ def poke(self, context, session=None):
                 refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
                 if not refreshed_dag_info.has_task(self.external_task_id):
                     raise AirflowException('The external task'
-                                           '{} in DAG {} does not exist.'.format(self.external_task_id,
-                                                                                 self.external_dag_id))
+                                           '{} in DAG {} does not exist.'
+                                           .format(self.external_task_id,
+                                                   self.external_dag_id))
             self.has_checked_existence = True
 
+        count_allowed = self.get_count(dttm_filter, session, self.allowed_states)
+
+        count_failed = -1
+        if len(self.failed_states) > 0:
+            count_failed = self.get_count(dttm_filter, session, self.failed_states)
+
+        session.commit()
+        if count_failed == len(dttm_filter):
+            if self.external_task_id:
+                raise AirflowException('The external task {} in DAG {} failed.'
+                                       .format(self.external_task_id, self.external_dag_id))
+            else:
+                raise AirflowException('The external DAG {} failed.'
+                                       .format(self.external_dag_id))
+
+        return count_allowed == len(dttm_filter)
+
+    def get_count(self, dttm_filter, session, states):
+        """
+        get the count of records against dttm filter and states
+        :param dttm_filter: date time filter for execution date
+        :type dttm_filter: list
+        :param session: airflow session object
+        :type session: SASession
+        :param states: task or dag states
+        :type states: list
+        :return: count of record against the filters
+        """
+        TI = TaskInstance
+        DR = DagRun
+
         if self.external_task_id:
             # .count() is inefficient
             count = session.query(func.count()).filter(
                 TI.dag_id == self.external_dag_id,
                 TI.task_id == self.external_task_id,
-                TI.state.in_(self.allowed_states),
+                TI.state.in_(states),  # pylint: disable=no-member

Review comment:
       Since the comment wasn't here before though, and it passed the tests I would guess you don't need it. Only on line 204.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#discussion_r416414485



##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -68,23 +70,35 @@ def __init__(self,
                  external_dag_id,
                  external_task_id=None,
                  allowed_states=None,
+                 failed_states=None,
                  execution_delta=None,
                  execution_date_fn=None,
                  check_existence=False,
                  *args,
                  **kwargs):
         super().__init__(*args, **kwargs)
         self.allowed_states = allowed_states or [State.SUCCESS]
+        self.failed_states = failed_states or []
+
+        total_states = []
+        total_states.extend(self.allowed_states)
+        total_states.extend(self.failed_states)
+
+        if len(list(set(total_states))) < (len(self.failed_states) + len(self.allowed_states)):

Review comment:
       ```suggestion
           if set(self.failed_states).intersection(set(self.allowed_states)):
   ```
   WDYT? Personally I would cast states to set even earlier. 

##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -68,23 +70,35 @@ def __init__(self,
                  external_dag_id,
                  external_task_id=None,
                  allowed_states=None,
+                 failed_states=None,
                  execution_delta=None,
                  execution_date_fn=None,
                  check_existence=False,
                  *args,
                  **kwargs):
         super().__init__(*args, **kwargs)
         self.allowed_states = allowed_states or [State.SUCCESS]
+        self.failed_states = failed_states or []
+
+        total_states = []
+        total_states.extend(self.allowed_states)
+        total_states.extend(self.failed_states)
+
+        if len(list(set(total_states))) < (len(self.failed_states) + len(self.allowed_states)):
+            raise AirflowException("Duplicate values provided as allowed "
+                                   "`{}` and failed states `{}`"
+                                   .format(self.allowed_states, self.failed_states))

Review comment:
       Let's use f-string :)

##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -68,23 +70,35 @@ def __init__(self,
                  external_dag_id,
                  external_task_id=None,
                  allowed_states=None,
+                 failed_states=None,
                  execution_delta=None,
                  execution_date_fn=None,
                  check_existence=False,
                  *args,
                  **kwargs):
         super().__init__(*args, **kwargs)
         self.allowed_states = allowed_states or [State.SUCCESS]
+        self.failed_states = failed_states or []
+
+        total_states = []
+        total_states.extend(self.allowed_states)
+        total_states.extend(self.failed_states)
+
+        if len(list(set(total_states))) < (len(self.failed_states) + len(self.allowed_states)):
+            raise AirflowException("Duplicate values provided as allowed "
+                                   "`{}` and failed states `{}`"
+                                   .format(self.allowed_states, self.failed_states))
+
         if external_task_id:
-            if not set(self.allowed_states) <= set(State.task_states):
+            if not set(total_states) <= set(State.task_states):

Review comment:
       I think, we should cast `total_states` to set in constructor 

##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -141,28 +152,58 @@ def poke(self, context, session=None):
                 refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
                 if not refreshed_dag_info.has_task(self.external_task_id):
                     raise AirflowException('The external task'
-                                           '{} in DAG {} does not exist.'.format(self.external_task_id,
-                                                                                 self.external_dag_id))
+                                           '{} in DAG {} does not exist.'
+                                           .format(self.external_task_id,
+                                                   self.external_dag_id))
             self.has_checked_existence = True
 
+        count_allowed = self.get_count(dttm_filter, session, self.allowed_states)
+
+        count_failed = -1
+        if len(self.failed_states) > 0:
+            count_failed = self.get_count(dttm_filter, session, self.failed_states)
+
+        session.commit()
+        if count_failed == len(dttm_filter):
+            if self.external_task_id:
+                raise AirflowException('The external task {} in DAG {} failed.'
+                                       .format(self.external_task_id, self.external_dag_id))
+            else:
+                raise AirflowException('The external DAG {} failed.'
+                                       .format(self.external_dag_id))

Review comment:
       Let's use f-string

##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -68,23 +70,35 @@ def __init__(self,
                  external_dag_id,
                  external_task_id=None,
                  allowed_states=None,
+                 failed_states=None,
                  execution_delta=None,
                  execution_date_fn=None,
                  check_existence=False,
                  *args,
                  **kwargs):
         super().__init__(*args, **kwargs)
         self.allowed_states = allowed_states or [State.SUCCESS]
+        self.failed_states = failed_states or []
+
+        total_states = []
+        total_states.extend(self.allowed_states)
+        total_states.extend(self.failed_states)

Review comment:
       ```suggestion
           total_states = self.allowed_states + self.failed_states
   ```
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-619192000


   > Thank you @potiuk. However it failed again while pushing Prod image and CI image.
   
   ah.. those should be skipped in PR from.foek ... Will.look at it tomorrow 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-618907950


   Once I merge my last change (Currently testing it here: https://github.com/potiuk/airflow/commit/8abbb662121a608ada70e97f13db26516163e166) this will stop being a problem. We will get much faster update of the images when Python patches are  released


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jward-bw commented on a change in pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
jward-bw commented on a change in pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#discussion_r412822312



##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -141,28 +153,58 @@ def poke(self, context, session=None):
                 refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
                 if not refreshed_dag_info.has_task(self.external_task_id):
                     raise AirflowException('The external task'
-                                           '{} in DAG {} does not exist.'.format(self.external_task_id,
-                                                                                 self.external_dag_id))
+                                           '{} in DAG {} does not exist.'
+                                           .format(self.external_task_id,
+                                                   self.external_dag_id))
             self.has_checked_existence = True
 
+        count_allowed = self.get_count(dttm_filter, session, self.allowed_states)
+
+        count_failed = -1
+        if len(self.failed_states) > 0:
+            count_failed = self.get_count(dttm_filter, session, self.failed_states)
+
+        session.commit()
+        if count_failed == len(dttm_filter):
+            if self.external_task_id:
+                raise AirflowException('The external task {} in DAG {} failed.'
+                                       .format(self.external_task_id, self.external_dag_id))
+            else:
+                raise AirflowException('The external DAG {} failed.'
+                                       .format(self.external_dag_id))
+
+        return count_allowed == len(dttm_filter)
+
+    def get_count(self, dttm_filter, session, states):
+        """
+        get the count of records against dttm filter and states
+        :param dttm_filter: date time filter for execution date
+        :type dttm_filter: list
+        :param session: airflow session object
+        :type session: SASession
+        :param states: task or dag states
+        :type states: list
+        :return: count of record against the filters
+        """
+        TI = TaskInstance
+        DR = DagRun
+
         if self.external_task_id:
             # .count() is inefficient
             count = session.query(func.count()).filter(
                 TI.dag_id == self.external_dag_id,
                 TI.task_id == self.external_task_id,
-                TI.state.in_(self.allowed_states),
+                TI.state.in_(states),
                 TI.execution_date.in_(dttm_filter),
             ).scalar()
         else:
             # .count() is inefficient
             count = session.query(func.count()).filter(
                 DR.dag_id == self.external_dag_id,
-                DR.state.in_(self.allowed_states),  # pylint: disable=no-member

Review comment:
       You need to keep this pylint comment here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-618917690


   Pushing latest images now @lokeshlal -> once this is done I will restart the CI build.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lokeshlal commented on a change in pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
lokeshlal commented on a change in pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#discussion_r412900398



##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -141,28 +153,58 @@ def poke(self, context, session=None):
                 refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
                 if not refreshed_dag_info.has_task(self.external_task_id):
                     raise AirflowException('The external task'
-                                           '{} in DAG {} does not exist.'.format(self.external_task_id,
-                                                                                 self.external_dag_id))
+                                           '{} in DAG {} does not exist.'
+                                           .format(self.external_task_id,
+                                                   self.external_dag_id))
             self.has_checked_existence = True
 
+        count_allowed = self.get_count(dttm_filter, session, self.allowed_states)
+
+        count_failed = -1
+        if len(self.failed_states) > 0:
+            count_failed = self.get_count(dttm_filter, session, self.failed_states)
+
+        session.commit()
+        if count_failed == len(dttm_filter):
+            if self.external_task_id:
+                raise AirflowException('The external task {} in DAG {} failed.'
+                                       .format(self.external_task_id, self.external_dag_id))
+            else:
+                raise AirflowException('The external DAG {} failed.'
+                                       .format(self.external_dag_id))
+
+        return count_allowed == len(dttm_filter)
+
+    def get_count(self, dttm_filter, session, states):
+        """
+        get the count of records against dttm filter and states
+        :param dttm_filter: date time filter for execution date
+        :type dttm_filter: list
+        :param session: airflow session object
+        :type session: SASession
+        :param states: task or dag states
+        :type states: list
+        :return: count of record against the filters
+        """
+        TI = TaskInstance
+        DR = DagRun
+
         if self.external_task_id:
             # .count() is inefficient
             count = session.query(func.count()).filter(
                 TI.dag_id == self.external_dag_id,
                 TI.task_id == self.external_task_id,
-                TI.state.in_(self.allowed_states),
+                TI.state.in_(states),  # pylint: disable=no-member

Review comment:
       Thanks jward-bw




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] randr97 commented on a change in pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
randr97 commented on a change in pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#discussion_r414575632



##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -68,23 +70,35 @@ def __init__(self,
                  external_dag_id,
                  external_task_id=None,
                  allowed_states=None,
+                 failed_states=None,
                  execution_delta=None,
                  execution_date_fn=None,
                  check_existence=False,
                  *args,
                  **kwargs):
         super().__init__(*args, **kwargs)
         self.allowed_states = allowed_states or [State.SUCCESS]
+        self.failed_states = failed_states or []
+
+        total_states = []
+        total_states.extend(self.allowed_states)
+        total_states.extend(self.failed_states)
+
+        if len(list(set(total_states))) < (len(self.failed_states) + len(self.allowed_states)):
+            raise AirflowException("Duplicate values provided as allowed "
+                                   "`{}` and failed states `{}`"
+                                   .format(self.allowed_states, self.failed_states))
+
         if external_task_id:
-            if not set(self.allowed_states) <= set(State.task_states):
+            if not set(total_states) <= set(State.task_states):
                 raise ValueError(
-                    'Valid values for `allowed_states` '
+                    'Valid values for `allowed_states` and `failed_states` '
                     'when `external_task_id` is not `None`: {}'.format(State.task_states)
                 )
         else:
-            if not set(self.allowed_states) <= set(State.dag_states):
+            if not set(total_states) <= set(State.dag_states):
                 raise ValueError(
-                    'Valid values for `allowed_states` '
+                    'Valid values for `allowed_states` and `failed_states` '
                     'when `external_task_id` is `None`: {}'.format(State.dag_states)
                 )

Review comment:
       `error_template = 'Valid values for `allowed_states` and `failed_states` '\
                    'when `external_task_id` is not `None`: {}'
   if external_task_id and not set(total_states) <= set(State.task_states):
       raise ValueError(error_template.format(State.task_states))
   elif not set(total_states) <= set(State.dag_states):
       raise ValueError(error_template.format(State.dag_states))`
   Jus my personal view :P




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lokeshlal commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
lokeshlal commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-624029003


   Hi @mik-laj 
   
   Just to confirm, the original document https://github.com/apache/airflow/blob/master/docs/howto/operator/external_task_sensor.rst needs to be edited to add functionality related to allowed and failed states.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lokeshlal commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
lokeshlal commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-618948698


   Thank you @potiuk. However it failed again while pushing Prod image and CI image.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-623924193


   Can you add some docs to guide?
   https://airflow.readthedocs.io/en/latest/howto/operator/external_task_sensor.html
   CC: @Acehaidrey  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lokeshlal commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
lokeshlal commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-620990248


   Thank you @turbaszek . I have incorporated all the changes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lokeshlal commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
lokeshlal commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-618901571


   Not sure what is the problem. Somehow the build is not getting passed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lokeshlal commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
lokeshlal commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-619844007


   Thank you @potiuk . Finally it succeeded.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-618926848


   OK. I resstarted the  build and it seems to use the new images now - it will be much faster :)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] lokeshlal commented on issue #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
lokeshlal commented on issue #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-617720329


   Hello, Above build failed with following error
   
   ##[error]Process completed with exit code 137.
   
   If this is because of docker container memory usage. Could someone please help


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk edited a comment on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

Posted by GitBox <gi...@apache.org>.
potiuk edited a comment on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-618926848


   OK. I restarted the  build and it seems to use the new images now - it will be much faster :)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org