You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/01/09 19:33:58 UTC

[GitHub] [airflow] rconroy293 opened a new pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

rconroy293 opened a new pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119
 
 
   This adds an "operator extra link" by default to external task sensors, linking to the graph of the DAG run they're waiting for. If multiple execution dates are being waited for, the link instead is not provided.
   
   ---
   Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   - [x] Description above provides context of the change
   - [x] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil removed a comment on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
kaxil removed a comment on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#issuecomment-573232828
 
 
   > One thing I'm struggling with is how to get the `execution_delta` parameter to deserialize as a `timedelta`. Right now, it looks like it serializes to a float (as number of seconds) and doesn't get converted back on deserialization. I could manually convert it here https://github.com/apache/airflow/pull/7119/files#diff-108fa7b4349eeb1a31c07c64a952c972R52, but that seems a bit hacky to me. Looking at the `_deserialize` function here https://github.com/apache/airflow/blob/master/airflow/serialization/serialized_objects.py#L200, it seems like there is the possibility to have it marked as a `timedelta`, but I can't seem to figure out where I would do that. Do I need to add typing to the property or maybe I define `__serialized_fields` differently?
   
   We already have timedelta: 
   On Serialization: https://github.com/apache/airflow/blob/master/airflow/serialization/serialized_objects.py#L169
   On De-serialization: https://github.com/apache/airflow/blob/master/airflow/serialization/serialized_objects.py#L222
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r366633960
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -16,22 +16,71 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import datetime
 import os
-from typing import Optional, Union
+from typing import FrozenSet, Optional, Union
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+def get_possible_target_execution_dates(execution_date, execution_delta, execution_date_fn):
+    """
+    Gets the execution date(s) of an external DAG for which an
+    ExternalTaskSensor should succeed on. Default is the execution
+    date itself, but it may be modified if a non-null execution delta
+    or execution date function is passed in.
+
+    :param execution_date: The execution date of the sensor
+    :type execution_date: datetime.datetime
+    :param execution_delta: Time difference between the sensor
+        execution date and the target DAG run execution date. Positive
+        delta looks back in time.
+    :type execution_delta: Optional[datetime.timedelta]
+    :param execution_date_fn: Function to compute the execution date(s)
+        of the target DAG run to look at given the sensor's execution
+        date.
+    :type execution_date_fn: Optional[Callable]
+    :return: Execution date(s) to wait for
+    :rtype: List[datetime.datetime]
+    """
+    if execution_delta:
+        dttm = execution_date - execution_delta
+    elif execution_date_fn:
+        dttm = execution_date_fn(execution_date)
+    else:
+        dttm = execution_date
+
+    return dttm if isinstance(dttm, list) else [dttm]
+
+
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+
+    def get_link(self, operator, dttm):
+        possible_execution_dates = get_possible_target_execution_dates(
+            execution_date=dttm,
+            execution_delta=getattr(operator, 'execution_delta', None),
+            execution_date_fn=None,
 
 Review comment:
   Oh that's unfortunate if we can't make an existing `execution_date_fn` feature work here because I assume many people would need to be using `execution_date_fn` to shift execution_date around according to some fancy logic they may have.
   
   I have one suggestion that might work. If we know the `task_id` and `dag_id` of the `operator`, we can always find the real `ExternalTaskSensor` object (e.g. by looking it up in the `DagBag`). Once we have a reference to the real `ExternalTaskSensor` object, we can call its `execution_date_fn`.
   
   If that doesn't work for you, I think it's important to make sure that if the `ExternalTaskSensor` uses `execution_date_fn`, the extra link doesn't get generated. In the current state of the PR, it looks like the link is going to point to the wrong `execution_date` if the user intended to use `execution_date_fn`. That is going to cause confusion. Also a comment that says `execution_date_fn` is not supported for the extra link is probably helpful too.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#issuecomment-573258310
 
 
   > One thing I'm struggling with is how to get the `execution_delta` parameter to deserialize as a `timedelta`. Right now, it looks like it serializes to a float (as number of seconds) and doesn't get converted back on deserialization. I could manually convert it here https://github.com/apache/airflow/pull/7119/files#diff-108fa7b4349eeb1a31c07c64a952c972R52, but that seems a bit hacky to me. Looking at the `_deserialize` function here https://github.com/apache/airflow/blob/master/airflow/serialization/serialized_objects.py#L200, it seems like there is the possibility to have it marked as a `timedelta`, but I can't seem to figure out where I would do that. Do I need to add typing to the property or maybe I define `__serialized_fields` differently?
   
   We have 2 options:
   
   1) Add `execution_delta` in  https://github.com/apache/airflow/blob/master/airflow/serialization/serialized_objects.py#L343
   
   2) Add `execution_delta` in `_decorated_fields` (https://github.com/apache/airflow/blob/master/airflow/serialization/serialized_objects.py#L273)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] boring-cyborg[bot] commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#issuecomment-572719109
 
 
   Congratulations on your first Pull Request and welcome to the Apache Airflow community!
   If you have any issues or are unsure about any anything please check our
   Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
   
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, pylint and type annotations). Our [pre-commits](
   https://github.com/apache/airflow/blob/master/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks)
   will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory).
   Adding a new operator? Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/master/BREEZE.rst) for testing
   locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from
   Committers.
   
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://apache-airflow-slack.herokuapp.com/
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r379879418
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -16,22 +16,71 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import datetime
 import os
-from typing import Optional, Union
+from typing import FrozenSet, Optional, Union
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+def get_possible_target_execution_dates(execution_date, execution_delta, execution_date_fn):
+    """
+    Gets the execution date(s) of an external DAG for which an
+    ExternalTaskSensor should succeed on. Default is the execution
+    date itself, but it may be modified if a non-null execution delta
+    or execution date function is passed in.
+
+    :param execution_date: The execution date of the sensor
+    :type execution_date: datetime.datetime
+    :param execution_delta: Time difference between the sensor
+        execution date and the target DAG run execution date. Positive
+        delta looks back in time.
+    :type execution_delta: Optional[datetime.timedelta]
+    :param execution_date_fn: Function to compute the execution date(s)
+        of the target DAG run to look at given the sensor's execution
+        date.
+    :type execution_date_fn: Optional[Callable]
+    :return: Execution date(s) to wait for
+    :rtype: List[datetime.datetime]
+    """
+    if execution_delta:
+        dttm = execution_date - execution_delta
+    elif execution_date_fn:
+        dttm = execution_date_fn(execution_date)
+    else:
+        dttm = execution_date
+
+    return dttm if isinstance(dttm, list) else [dttm]
+
+
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+
+    def get_link(self, operator, dttm):
+        possible_execution_dates = get_possible_target_execution_dates(
+            execution_date=dttm,
+            execution_delta=getattr(operator, 'execution_delta', None),
+            execution_date_fn=None,
 
 Review comment:
   @rconroy293  any interests in considering XCom further for implementing the link? If not, I might want to give this a try and put up a PR some time next week.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] rconroy293 commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
rconroy293 commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#issuecomment-574305768
 
 
   Alright, I think thinks are in a good state @kaxil - things should be good on the serialization front. The failing build is labeled `kerberos-temporarily-disabled`, so I assume that's okay.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#issuecomment-572737962
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=h1) Report
   > Merging [#7119](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/28d8ef1eff901cce36e6c3fa53b944a1cf7fa00a?src=pr&el=desc) will **increase** coverage by `0.68%`.
   > The diff coverage is `93.75%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7119/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7119      +/-   ##
   ==========================================
   + Coverage   85.17%   85.86%   +0.68%     
   ==========================================
     Files         683      754      +71     
     Lines       39155    42887    +3732     
   ==========================================
   + Hits        33352    36823    +3471     
   - Misses       5803     6064     +261
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/sensors/external\_task\_sensor.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZW5zb3JzL2V4dGVybmFsX3Rhc2tfc2Vuc29yLnB5) | `89.36% <100%> (+3.83%)` | :arrow_up: |
   | [airflow/serialization/serialized\_objects.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZXJpYWxpemF0aW9uL3NlcmlhbGl6ZWRfb2JqZWN0cy5weQ==) | `90.03% <100%> (ø)` | :arrow_up: |
   | [airflow/plugins\_manager.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9wbHVnaW5zX21hbmFnZXIucHk=) | `89.75% <60%> (-0.63%)` | :arrow_down: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `45.25% <0%> (-46.72%)` | :arrow_down: |
   | [airflow/security/kerberos.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZWN1cml0eS9rZXJiZXJvcy5weQ==) | `30.43% <0%> (-45.66%)` | :arrow_down: |
   | [...rflow/contrib/operators/kubernetes\_pod\_operator.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9jb250cmliL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZF9vcGVyYXRvci5weQ==) | `71.42% <0%> (-27.37%)` | :arrow_down: |
   | [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
   | [airflow/contrib/hooks/azure\_data\_lake\_hook.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9jb250cmliL2hvb2tzL2F6dXJlX2RhdGFfbGFrZV9ob29rLnB5) | `81.81% <0%> (-11.29%)` | :arrow_down: |
   | ... and [241 more](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=footer). Last update [28d8ef1...dce9882](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#issuecomment-590573136
 
 
   Thanks @rconroy293 , this looks good. Can you resolve the conflicts please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r365482592
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -16,22 +16,52 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import datetime
 import os
-from typing import Optional, Union
+from typing import FrozenSet, Optional, Union
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+def get_possible_target_execution_dates(execution_date, execution_delta, execution_date_fn):
+    if execution_delta:
 
 Review comment:
   Can you add docstrings for this function

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r367386381
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -16,22 +16,71 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import datetime
 import os
-from typing import Optional, Union
+from typing import FrozenSet, Optional, Union
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+def get_possible_target_execution_dates(execution_date, execution_delta, execution_date_fn):
+    """
+    Gets the execution date(s) of an external DAG for which an
+    ExternalTaskSensor should succeed on. Default is the execution
+    date itself, but it may be modified if a non-null execution delta
+    or execution date function is passed in.
+
+    :param execution_date: The execution date of the sensor
+    :type execution_date: datetime.datetime
+    :param execution_delta: Time difference between the sensor
+        execution date and the target DAG run execution date. Positive
+        delta looks back in time.
+    :type execution_delta: Optional[datetime.timedelta]
+    :param execution_date_fn: Function to compute the execution date(s)
+        of the target DAG run to look at given the sensor's execution
+        date.
+    :type execution_date_fn: Optional[Callable]
+    :return: Execution date(s) to wait for
+    :rtype: List[datetime.datetime]
+    """
+    if execution_delta:
+        dttm = execution_date - execution_delta
+    elif execution_date_fn:
+        dttm = execution_date_fn(execution_date)
+    else:
+        dttm = execution_date
+
+    return dttm if isinstance(dttm, list) else [dttm]
+
+
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+
+    def get_link(self, operator, dttm):
+        possible_execution_dates = get_possible_target_execution_dates(
+            execution_date=dttm,
+            execution_delta=getattr(operator, 'execution_delta', None),
+            execution_date_fn=None,
 
 Review comment:
   No, looking up the dag in the Dag bag won't work -- the whole goal of enabling serialization is to not need to run the any code in the dag files (or ideally even not need access to the files at all!) from the webserver.
   
   To support when the dag serialization is not enabled/not yet the only option you could do `getattr(operator, 'execution_date_fn')` -- or the other option might be to have the operator store the result of the FN in xcom, and then have the link look up the result from XCom?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] rconroy293 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
rconroy293 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r366459866
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -16,22 +16,71 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import datetime
 import os
-from typing import Optional, Union
+from typing import FrozenSet, Optional, Union
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+def get_possible_target_execution_dates(execution_date, execution_delta, execution_date_fn):
+    """
+    Gets the execution date(s) of an external DAG for which an
+    ExternalTaskSensor should succeed on. Default is the execution
+    date itself, but it may be modified if a non-null execution delta
+    or execution date function is passed in.
+
+    :param execution_date: The execution date of the sensor
+    :type execution_date: datetime.datetime
+    :param execution_delta: Time difference between the sensor
+        execution date and the target DAG run execution date. Positive
+        delta looks back in time.
+    :type execution_delta: Optional[datetime.timedelta]
+    :param execution_date_fn: Function to compute the execution date(s)
+        of the target DAG run to look at given the sensor's execution
+        date.
+    :type execution_date_fn: Optional[Callable]
+    :return: Execution date(s) to wait for
+    :rtype: List[datetime.datetime]
+    """
+    if execution_delta:
+        dttm = execution_date - execution_delta
+    elif execution_date_fn:
+        dttm = execution_date_fn(execution_date)
+    else:
+        dttm = execution_date
+
+    return dttm if isinstance(dttm, list) else [dttm]
+
+
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+
+    def get_link(self, operator, dttm):
+        possible_execution_dates = get_possible_target_execution_dates(
+            execution_date=dttm,
+            execution_delta=getattr(operator, 'execution_delta', None),
+            execution_date_fn=None,
 
 Review comment:
   That's correct. I don't think we're able to serialize / deserialize functions, so that would be incompatible with Airflow 2.0

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r370102721
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -16,22 +16,71 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import datetime
 import os
-from typing import Optional, Union
+from typing import FrozenSet, Optional, Union
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+def get_possible_target_execution_dates(execution_date, execution_delta, execution_date_fn):
+    """
+    Gets the execution date(s) of an external DAG for which an
+    ExternalTaskSensor should succeed on. Default is the execution
+    date itself, but it may be modified if a non-null execution delta
+    or execution date function is passed in.
+
+    :param execution_date: The execution date of the sensor
+    :type execution_date: datetime.datetime
+    :param execution_delta: Time difference between the sensor
+        execution date and the target DAG run execution date. Positive
+        delta looks back in time.
+    :type execution_delta: Optional[datetime.timedelta]
+    :param execution_date_fn: Function to compute the execution date(s)
+        of the target DAG run to look at given the sensor's execution
+        date.
+    :type execution_date_fn: Optional[Callable]
+    :return: Execution date(s) to wait for
+    :rtype: List[datetime.datetime]
+    """
+    if execution_delta:
+        dttm = execution_date - execution_delta
+    elif execution_date_fn:
+        dttm = execution_date_fn(execution_date)
+    else:
+        dttm = execution_date
+
+    return dttm if isinstance(dttm, list) else [dttm]
+
+
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+
+    def get_link(self, operator, dttm):
+        possible_execution_dates = get_possible_target_execution_dates(
+            execution_date=dttm,
+            execution_delta=getattr(operator, 'execution_delta', None),
+            execution_date_fn=None,
 
 Review comment:
   Let me know if you want to use Xcom

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r368285269
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -16,22 +16,71 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import datetime
 import os
-from typing import Optional, Union
+from typing import FrozenSet, Optional, Union
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+def get_possible_target_execution_dates(execution_date, execution_delta, execution_date_fn):
+    """
+    Gets the execution date(s) of an external DAG for which an
+    ExternalTaskSensor should succeed on. Default is the execution
+    date itself, but it may be modified if a non-null execution delta
+    or execution date function is passed in.
+
+    :param execution_date: The execution date of the sensor
+    :type execution_date: datetime.datetime
+    :param execution_delta: Time difference between the sensor
+        execution date and the target DAG run execution date. Positive
+        delta looks back in time.
+    :type execution_delta: Optional[datetime.timedelta]
+    :param execution_date_fn: Function to compute the execution date(s)
+        of the target DAG run to look at given the sensor's execution
+        date.
+    :type execution_date_fn: Optional[Callable]
+    :return: Execution date(s) to wait for
+    :rtype: List[datetime.datetime]
+    """
+    if execution_delta:
+        dttm = execution_date - execution_delta
+    elif execution_date_fn:
+        dttm = execution_date_fn(execution_date)
+    else:
+        dttm = execution_date
+
+    return dttm if isinstance(dttm, list) else [dttm]
+
+
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+
+    def get_link(self, operator, dttm):
+        possible_execution_dates = get_possible_target_execution_dates(
+            execution_date=dttm,
+            execution_delta=getattr(operator, 'execution_delta', None),
+            execution_date_fn=None,
 
 Review comment:
   @rconroy293 Below is how you could use xcom
   
   https://github.com/apache/airflow/blob/5878e48b787797c54b4138bc0166859232ea08d6/airflow/gcp/operators/bigquery.py#L327-L336

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] rconroy293 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
rconroy293 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r367720047
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -16,22 +16,71 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import datetime
 import os
-from typing import Optional, Union
+from typing import FrozenSet, Optional, Union
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+def get_possible_target_execution_dates(execution_date, execution_delta, execution_date_fn):
+    """
+    Gets the execution date(s) of an external DAG for which an
+    ExternalTaskSensor should succeed on. Default is the execution
+    date itself, but it may be modified if a non-null execution delta
+    or execution date function is passed in.
+
+    :param execution_date: The execution date of the sensor
+    :type execution_date: datetime.datetime
+    :param execution_delta: Time difference between the sensor
+        execution date and the target DAG run execution date. Positive
+        delta looks back in time.
+    :type execution_delta: Optional[datetime.timedelta]
+    :param execution_date_fn: Function to compute the execution date(s)
+        of the target DAG run to look at given the sensor's execution
+        date.
+    :type execution_date_fn: Optional[Callable]
+    :return: Execution date(s) to wait for
+    :rtype: List[datetime.datetime]
+    """
+    if execution_delta:
+        dttm = execution_date - execution_delta
+    elif execution_date_fn:
+        dttm = execution_date_fn(execution_date)
+    else:
+        dttm = execution_date
+
+    return dttm if isinstance(dttm, list) else [dttm]
+
+
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+
+    def get_link(self, operator, dttm):
+        possible_execution_dates = get_possible_target_execution_dates(
+            execution_date=dttm,
+            execution_delta=getattr(operator, 'execution_delta', None),
+            execution_date_fn=None,
 
 Review comment:
   @ashb I like the idea of using XCom. How would we be able to do an `xcom_pull` in the `get_link` method? It looks like we only have access to the serialized operator and the execution date, but not a task instance

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r365025063
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -21,14 +21,31 @@
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+
+    def get_link(self, operator, dttm):
+        if isinstance(operator, ExternalTaskSensor):
+            possible_execution_dates = operator.get_possible_target_execution_dates(execution_date=dttm)
 
 Review comment:
   This won't work with DAG Serialization because of the use of this function. 
   
   Check the Qubole Operator and how the test is implemented for it:
   
   https://github.com/apache/airflow/blob/803a87f2b2b564b3e8784de305f5bb4e9578e578/tests/contrib/operators/test_qubole_operator.py#L146-L176 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#issuecomment-572737962
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=h1) Report
   > Merging [#7119](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/c880221d76cf21c86ecdd24d1d423b78d6ceb32d?src=pr&el=desc) will **decrease** coverage by `0.27%`.
   > The diff coverage is `100%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7119/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7119      +/-   ##
   ==========================================
   - Coverage   85.23%   84.96%   -0.28%     
   ==========================================
     Files         682      682              
     Lines       39045    39057      +12     
   ==========================================
   - Hits        33280    33184      -96     
   - Misses       5765     5873     +108
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/sensors/external\_task\_sensor.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZW5zb3JzL2V4dGVybmFsX3Rhc2tfc2Vuc29yLnB5) | `89.85% <100%> (+3.89%)` | :arrow_up: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `45.25% <0%> (-46.72%)` | :arrow_down: |
   | [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
   | [...rflow/contrib/operators/kubernetes\_pod\_operator.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9jb250cmliL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZF9vcGVyYXRvci5weQ==) | `78.75% <0%> (-20%)` | :arrow_down: |
   | [airflow/contrib/operators/ssh\_operator.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9jb250cmliL29wZXJhdG9ycy9zc2hfb3BlcmF0b3IucHk=) | `84.61% <0%> (+1.28%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=footer). Last update [c880221...49eac8a](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil edited a comment on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#issuecomment-573258310
 
 
   > One thing I'm struggling with is how to get the `execution_delta` parameter to deserialize as a `timedelta`. Right now, it looks like it serializes to a float (as number of seconds) and doesn't get converted back on deserialization. I could manually convert it here https://github.com/apache/airflow/pull/7119/files#diff-108fa7b4349eeb1a31c07c64a952c972R52, but that seems a bit hacky to me. Looking at the `_deserialize` function here https://github.com/apache/airflow/blob/master/airflow/serialization/serialized_objects.py#L200, it seems like there is the possibility to have it marked as a `timedelta`, but I can't seem to figure out where I would do that. Do I need to add typing to the property or maybe I define `__serialized_fields` differently?
   
   We have 2 options:
   
   1) Add `execution_delta` in  https://github.com/apache/airflow/blob/master/airflow/serialization/serialized_objects.py#L343
   
   2) Add `execution_delta` in `_decorated_fields` (https://github.com/apache/airflow/blob/master/airflow/serialization/serialized_objects.py#L273)
   
   I think, probably adding it in (1) makes more sense. WDYT @ashb ?
   
   Test would pass with either of those options

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r365457054
 
 

 ##########
 File path: airflow/plugins_manager.py
 ##########
 @@ -133,6 +133,9 @@ def register_inbuilt_operator_links() -> None:
     except ImportError:
         pass
 
+    from airflow.sensors.external_task_sensor import ExternalTaskLink
+    inbuilt_operator_links.update([ExternalTaskLink])
+
 
 Review comment:
   let's do it in the following way
   
   ```python
       try:
           from airflow.sensors.external_task_sensor import ExternalTaskLink   # pylint: disable=R0401
           inbuilt_operator_links.update([ExternalTaskLink])
       except ImportError:
           pass
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r365025063
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -21,14 +21,31 @@
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+
+    def get_link(self, operator, dttm):
+        if isinstance(operator, ExternalTaskSensor):
+            possible_execution_dates = operator.get_possible_target_execution_dates(execution_date=dttm)
 
 Review comment:
   This won't work because of the use of this function.
   
   Check the Qubole Operator and how the test is implemented for it:
   
   https://github.com/apache/airflow/blob/803a87f2b2b564b3e8784de305f5bb4e9578e578/tests/contrib/operators/test_qubole_operator.py#L146-L176

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r365457054
 
 

 ##########
 File path: airflow/plugins_manager.py
 ##########
 @@ -133,6 +133,9 @@ def register_inbuilt_operator_links() -> None:
     except ImportError:
         pass
 
+    from airflow.sensors.external_task_sensor import ExternalTaskLink
+    inbuilt_operator_links.update([ExternalTaskLink])
+
 
 Review comment:
   let's do it in the following way
   
   ```
       try:
           from airflow.sensors.external_task_sensor import ExternalTaskLink   # pylint: disable=R0401
           inbuilt_operator_links.update([ExternalTaskLink])
       except ImportError:
           pass
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] yuqian90 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
yuqian90 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r366347435
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -16,22 +16,71 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import datetime
 import os
-from typing import Optional, Union
+from typing import FrozenSet, Optional, Union
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+def get_possible_target_execution_dates(execution_date, execution_delta, execution_date_fn):
+    """
+    Gets the execution date(s) of an external DAG for which an
+    ExternalTaskSensor should succeed on. Default is the execution
+    date itself, but it may be modified if a non-null execution delta
+    or execution date function is passed in.
+
+    :param execution_date: The execution date of the sensor
+    :type execution_date: datetime.datetime
+    :param execution_delta: Time difference between the sensor
+        execution date and the target DAG run execution date. Positive
+        delta looks back in time.
+    :type execution_delta: Optional[datetime.timedelta]
+    :param execution_date_fn: Function to compute the execution date(s)
+        of the target DAG run to look at given the sensor's execution
+        date.
+    :type execution_date_fn: Optional[Callable]
+    :return: Execution date(s) to wait for
+    :rtype: List[datetime.datetime]
+    """
+    if execution_delta:
+        dttm = execution_date - execution_delta
+    elif execution_date_fn:
+        dttm = execution_date_fn(execution_date)
+    else:
+        dttm = execution_date
+
+    return dttm if isinstance(dttm, list) else [dttm]
+
+
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+
+    def get_link(self, operator, dttm):
+        possible_execution_dates = get_possible_target_execution_dates(
+            execution_date=dttm,
+            execution_delta=getattr(operator, 'execution_delta', None),
+            execution_date_fn=None,
 
 Review comment:
   `execution_date_fn` is always `None`? Does this mean `ExternalTaskLink` won't work with `ExternalTaskSensor` that defines `execution_date_fn` ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#issuecomment-572737962
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=h1) Report
   > Merging [#7119](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/28d8ef1eff901cce36e6c3fa53b944a1cf7fa00a?src=pr&el=desc) will **decrease** coverage by `0.77%`.
   > The diff coverage is `90%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7119/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff            @@
   ##           master   #7119      +/-   ##
   =========================================
   - Coverage   85.17%   84.4%   -0.78%     
   =========================================
     Files         683     710      +27     
     Lines       39155   39500     +345     
   =========================================
   - Hits        33352   33341      -11     
   - Misses       5803    6159     +356
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/serialization/serialized\_objects.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZXJpYWxpemF0aW9uL3NlcmlhbGl6ZWRfb2JqZWN0cy5weQ==) | `90.03% <100%> (ø)` | :arrow_up: |
   | [airflow/plugins\_manager.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9wbHVnaW5zX21hbmFnZXIucHk=) | `89.44% <60%> (-0.95%)` | :arrow_down: |
   | [airflow/sensors/external\_task\_sensor.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZW5zb3JzL2V4dGVybmFsX3Rhc2tfc2Vuc29yLnB5) | `88.04% <95.83%> (+2.51%)` | :arrow_up: |
   | [airflow/operators/mysql\_operator.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvbXlzcWxfb3BlcmF0b3IucHk=) | `0% <0%> (-100%)` | :arrow_down: |
   | [airflow/operators/mysql\_to\_hive.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvbXlzcWxfdG9faGl2ZS5weQ==) | `0% <0%> (-100%)` | :arrow_down: |
   | [...flow/providers/apache/cassandra/hooks/cassandra.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYXBhY2hlL2Nhc3NhbmRyYS9ob29rcy9jYXNzYW5kcmEucHk=) | `21.51% <0%> (-72.16%)` | :arrow_down: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/api/auth/backend/kerberos\_auth.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvYXV0aC9iYWNrZW5kL2tlcmJlcm9zX2F1dGgucHk=) | `28.16% <0%> (-54.93%)` | :arrow_down: |
   | [...irflow/contrib/operators/redis\_publish\_operator.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9jb250cmliL29wZXJhdG9ycy9yZWRpc19wdWJsaXNoX29wZXJhdG9yLnB5) | `50% <0%> (-50%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | ... and [98 more](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=footer). Last update [28d8ef1...e0cab76](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
kaxil commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#issuecomment-573232828
 
 
   > One thing I'm struggling with is how to get the `execution_delta` parameter to deserialize as a `timedelta`. Right now, it looks like it serializes to a float (as number of seconds) and doesn't get converted back on deserialization. I could manually convert it here https://github.com/apache/airflow/pull/7119/files#diff-108fa7b4349eeb1a31c07c64a952c972R52, but that seems a bit hacky to me. Looking at the `_deserialize` function here https://github.com/apache/airflow/blob/master/airflow/serialization/serialized_objects.py#L200, it seems like there is the possibility to have it marked as a `timedelta`, but I can't seem to figure out where I would do that. Do I need to add typing to the property or maybe I define `__serialized_fields` differently?
   
   We already have timedelta: https://github.com/apache/airflow/blob/master/airflow/serialization/serialized_objects.py#L222
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] rconroy293 commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
rconroy293 commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#issuecomment-573230277
 
 
   One thing I'm struggling with is how to get the `execution_delta` parameter to deserialize as a `timedelta`. Right now, it looks like it serializes to a float (as number of seconds) and doesn't get converted back on deserialization. I could manually convert it here https://github.com/apache/airflow/pull/7119/files#diff-108fa7b4349eeb1a31c07c64a952c972R52, but that seems a bit hacky to me. Looking at the `_deserialize` function here https://github.com/apache/airflow/blob/master/airflow/serialization/serialized_objects.py#L200, it seems like there is the possibility to have it marked as a `timedelta`, but I can't seem to figure out where I would do that. Do I need to add typing to the property or maybe I define `__serialized_fields` differently?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r370940067
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -16,22 +16,71 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import datetime
 import os
-from typing import Optional, Union
+from typing import FrozenSet, Optional, Union
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+def get_possible_target_execution_dates(execution_date, execution_delta, execution_date_fn):
+    """
+    Gets the execution date(s) of an external DAG for which an
+    ExternalTaskSensor should succeed on. Default is the execution
+    date itself, but it may be modified if a non-null execution delta
+    or execution date function is passed in.
+
+    :param execution_date: The execution date of the sensor
+    :type execution_date: datetime.datetime
+    :param execution_delta: Time difference between the sensor
+        execution date and the target DAG run execution date. Positive
+        delta looks back in time.
+    :type execution_delta: Optional[datetime.timedelta]
+    :param execution_date_fn: Function to compute the execution date(s)
+        of the target DAG run to look at given the sensor's execution
+        date.
+    :type execution_date_fn: Optional[Callable]
+    :return: Execution date(s) to wait for
+    :rtype: List[datetime.datetime]
+    """
+    if execution_delta:
+        dttm = execution_date - execution_delta
+    elif execution_date_fn:
+        dttm = execution_date_fn(execution_date)
+    else:
+        dttm = execution_date
+
+    return dttm if isinstance(dttm, list) else [dttm]
+
+
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+
+    def get_link(self, operator, dttm):
+        possible_execution_dates = get_possible_target_execution_dates(
+            execution_date=dttm,
+            execution_delta=getattr(operator, 'execution_delta', None),
+            execution_date_fn=None,
 
 Review comment:
   Any updates on the xcom bit @rconroy293 ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r365025752
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -21,14 +21,31 @@
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+
+    def get_link(self, operator, dttm):
+        if isinstance(operator, ExternalTaskSensor):
+            possible_execution_dates = operator.get_possible_target_execution_dates(execution_date=dttm)
 
 Review comment:
   The SerializedOperator won't have this function hence it would fail

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil edited a comment on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#issuecomment-590573136
 
 
   Thanks @rconroy293 , this looks good. Can you rebase to the latest master and resolve the conflicts please.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#issuecomment-572737962
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=h1) Report
   > Merging [#7119](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/28d8ef1eff901cce36e6c3fa53b944a1cf7fa00a?src=pr&el=desc) will **increase** coverage by `0.35%`.
   > The diff coverage is `91.95%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7119/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7119      +/-   ##
   ==========================================
   + Coverage   85.17%   85.53%   +0.35%     
   ==========================================
     Files         683      754      +71     
     Lines       39155    39836     +681     
   ==========================================
   + Hits        33352    34074     +722     
   + Misses       5803     5762      -41
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ow/contrib/example\_dags/example\_qubole\_operator.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9jb250cmliL2V4YW1wbGVfZGFncy9leGFtcGxlX3F1Ym9sZV9vcGVyYXRvci5weQ==) | `90% <ø> (ø)` | :arrow_up: |
   | [...mple\_dags/example\_automl\_nl\_text\_classification.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9nY3AvZXhhbXBsZV9kYWdzL2V4YW1wbGVfYXV0b21sX25sX3RleHRfY2xhc3NpZmljYXRpb24ucHk=) | `100% <ø> (ø)` | :arrow_up: |
   | [...flow/contrib/example\_dags/example\_qubole\_sensor.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9jb250cmliL2V4YW1wbGVfZGFncy9leGFtcGxlX3F1Ym9sZV9zZW5zb3IucHk=) | `100% <ø> (ø)` | :arrow_up: |
   | [...ow/providers/apache/hive/sensors/hive\_partition.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYXBhY2hlL2hpdmUvc2Vuc29ycy9oaXZlX3BhcnRpdGlvbi5weQ==) | `36.36% <ø> (ø)` | |
   | [...low/providers/apache/spark/operators/spark\_jdbc.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYXBhY2hlL3NwYXJrL29wZXJhdG9ycy9zcGFya19qZGJjLnB5) | `92.5% <ø> (ø)` | |
   | [airflow/gcp/example\_dags/example\_functions.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9nY3AvZXhhbXBsZV9kYWdzL2V4YW1wbGVfZnVuY3Rpb25zLnB5) | `79.41% <ø> (ø)` | :arrow_up: |
   | [...ders/microsoft/azure/operators/wasb\_delete\_blob.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvbWljcm9zb2Z0L2F6dXJlL29wZXJhdG9ycy93YXNiX2RlbGV0ZV9ibG9iLnB5) | `100% <ø> (ø)` | |
   | [airflow/gcp/example\_dags/example\_datastore.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9nY3AvZXhhbXBsZV9kYWdzL2V4YW1wbGVfZGF0YXN0b3JlLnB5) | `100% <ø> (ø)` | :arrow_up: |
   | [...ow/providers/google/cloud/operators/sftp\_to\_gcs.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL29wZXJhdG9ycy9zZnRwX3RvX2djcy5weQ==) | `100% <ø> (ø)` | :arrow_up: |
   | [...contrib/example\_dags/example\_papermill\_operator.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9jb250cmliL2V4YW1wbGVfZGFncy9leGFtcGxlX3BhcGVybWlsbF9vcGVyYXRvci5weQ==) | `100% <ø> (ø)` | :arrow_up: |
   | ... and [346 more](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=footer). Last update [28d8ef1...dce9882](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] rconroy293 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
rconroy293 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r366496424
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -16,22 +16,52 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import datetime
 import os
-from typing import Optional, Union
+from typing import FrozenSet, Optional, Union
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+def get_possible_target_execution_dates(execution_date, execution_delta, execution_date_fn):
+    if execution_delta:
 
 Review comment:
   Added

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#issuecomment-572737962
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=h1) Report
   > Merging [#7119](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/28d8ef1eff901cce36e6c3fa53b944a1cf7fa00a?src=pr&el=desc) will **increase** coverage by `0.35%`.
   > The diff coverage is `91.95%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7119/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7119      +/-   ##
   ==========================================
   + Coverage   85.17%   85.53%   +0.35%     
   ==========================================
     Files         683      754      +71     
     Lines       39155    39836     +681     
   ==========================================
   + Hits        33352    34074     +722     
   + Misses       5803     5762      -41
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ow/contrib/example\_dags/example\_qubole\_operator.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9jb250cmliL2V4YW1wbGVfZGFncy9leGFtcGxlX3F1Ym9sZV9vcGVyYXRvci5weQ==) | `90% <ø> (ø)` | :arrow_up: |
   | [...mple\_dags/example\_automl\_nl\_text\_classification.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9nY3AvZXhhbXBsZV9kYWdzL2V4YW1wbGVfYXV0b21sX25sX3RleHRfY2xhc3NpZmljYXRpb24ucHk=) | `100% <ø> (ø)` | :arrow_up: |
   | [...flow/contrib/example\_dags/example\_qubole\_sensor.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9jb250cmliL2V4YW1wbGVfZGFncy9leGFtcGxlX3F1Ym9sZV9zZW5zb3IucHk=) | `100% <ø> (ø)` | :arrow_up: |
   | [...ow/providers/apache/hive/sensors/hive\_partition.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYXBhY2hlL2hpdmUvc2Vuc29ycy9oaXZlX3BhcnRpdGlvbi5weQ==) | `36.36% <ø> (ø)` | |
   | [...low/providers/apache/spark/operators/spark\_jdbc.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYXBhY2hlL3NwYXJrL29wZXJhdG9ycy9zcGFya19qZGJjLnB5) | `92.5% <ø> (ø)` | |
   | [airflow/gcp/example\_dags/example\_functions.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9nY3AvZXhhbXBsZV9kYWdzL2V4YW1wbGVfZnVuY3Rpb25zLnB5) | `79.41% <ø> (ø)` | :arrow_up: |
   | [...ders/microsoft/azure/operators/wasb\_delete\_blob.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvbWljcm9zb2Z0L2F6dXJlL29wZXJhdG9ycy93YXNiX2RlbGV0ZV9ibG9iLnB5) | `100% <ø> (ø)` | |
   | [airflow/gcp/example\_dags/example\_datastore.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9nY3AvZXhhbXBsZV9kYWdzL2V4YW1wbGVfZGF0YXN0b3JlLnB5) | `100% <ø> (ø)` | :arrow_up: |
   | [...ow/providers/google/cloud/operators/sftp\_to\_gcs.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL29wZXJhdG9ycy9zZnRwX3RvX2djcy5weQ==) | `100% <ø> (ø)` | :arrow_up: |
   | [...contrib/example\_dags/example\_papermill\_operator.py](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree#diff-YWlyZmxvdy9jb250cmliL2V4YW1wbGVfZGFncy9leGFtcGxlX3BhcGVybWlsbF9vcGVyYXRvci5weQ==) | `100% <ø> (ø)` | :arrow_up: |
   | ... and [346 more](https://codecov.io/gh/apache/airflow/pull/7119/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=footer). Last update [28d8ef1...dce9882](https://codecov.io/gh/apache/airflow/pull/7119?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] stale[bot] closed pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
stale[bot] closed pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] rconroy293 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
rconroy293 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r366496493
 
 

 ##########
 File path: airflow/plugins_manager.py
 ##########
 @@ -133,6 +133,9 @@ def register_inbuilt_operator_links() -> None:
     except ImportError:
         pass
 
+    from airflow.sensors.external_task_sensor import ExternalTaskLink
+    inbuilt_operator_links.update([ExternalTaskLink])
+
 
 Review comment:
   Fixed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] rconroy293 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
rconroy293 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r366998333
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -16,22 +16,71 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import datetime
 import os
-from typing import Optional, Union
+from typing import FrozenSet, Optional, Union
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+def get_possible_target_execution_dates(execution_date, execution_delta, execution_date_fn):
+    """
+    Gets the execution date(s) of an external DAG for which an
+    ExternalTaskSensor should succeed on. Default is the execution
+    date itself, but it may be modified if a non-null execution delta
+    or execution date function is passed in.
+
+    :param execution_date: The execution date of the sensor
+    :type execution_date: datetime.datetime
+    :param execution_delta: Time difference between the sensor
+        execution date and the target DAG run execution date. Positive
+        delta looks back in time.
+    :type execution_delta: Optional[datetime.timedelta]
+    :param execution_date_fn: Function to compute the execution date(s)
+        of the target DAG run to look at given the sensor's execution
+        date.
+    :type execution_date_fn: Optional[Callable]
+    :return: Execution date(s) to wait for
+    :rtype: List[datetime.datetime]
+    """
+    if execution_delta:
+        dttm = execution_date - execution_delta
+    elif execution_date_fn:
+        dttm = execution_date_fn(execution_date)
+    else:
+        dttm = execution_date
+
+    return dttm if isinstance(dttm, list) else [dttm]
+
+
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+
+    def get_link(self, operator, dttm):
+        possible_execution_dates = get_possible_target_execution_dates(
+            execution_date=dttm,
+            execution_delta=getattr(operator, 'execution_delta', None),
+            execution_date_fn=None,
 
 Review comment:
   @kaxil Could you confirm whether this approach using the DagBag would work with DAG serialization?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil edited a comment on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
kaxil edited a comment on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#issuecomment-573232828
 
 
   > One thing I'm struggling with is how to get the `execution_delta` parameter to deserialize as a `timedelta`. Right now, it looks like it serializes to a float (as number of seconds) and doesn't get converted back on deserialization. I could manually convert it here https://github.com/apache/airflow/pull/7119/files#diff-108fa7b4349eeb1a31c07c64a952c972R52, but that seems a bit hacky to me. Looking at the `_deserialize` function here https://github.com/apache/airflow/blob/master/airflow/serialization/serialized_objects.py#L200, it seems like there is the possibility to have it marked as a `timedelta`, but I can't seem to figure out where I would do that. Do I need to add typing to the property or maybe I define `__serialized_fields` differently?
   
   We already have timedelta: 
   On Serialization: https://github.com/apache/airflow/blob/master/airflow/serialization/serialized_objects.py#L169
   On De-serialization: https://github.com/apache/airflow/blob/master/airflow/serialization/serialized_objects.py#L222
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] stale[bot] commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
stale[bot] commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#issuecomment-611787068
 
 
   This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#issuecomment-572804536
 
 
   @ashb @kaxil Will it work with DAG serialization enabled? Do you have another idea for implementing this?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] rconroy293 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor

Posted by GitBox <gi...@apache.org>.
rconroy293 commented on a change in pull request #7119: [AIRFLOW-5840] Add operator extra link to external task sensor
URL: https://github.com/apache/airflow/pull/7119#discussion_r367030797
 
 

 ##########
 File path: airflow/sensors/external_task_sensor.py
 ##########
 @@ -16,22 +16,71 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
 import datetime
 import os
-from typing import Optional, Union
+from typing import FrozenSet, Optional, Union
 
 from sqlalchemy import func
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagBag, DagModel, DagRun, TaskInstance
+from airflow.models import BaseOperatorLink, DagBag, DagModel, DagRun, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.sensors.base_sensor_operator import BaseSensorOperator
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
 
 
+def get_possible_target_execution_dates(execution_date, execution_delta, execution_date_fn):
+    """
+    Gets the execution date(s) of an external DAG for which an
+    ExternalTaskSensor should succeed on. Default is the execution
+    date itself, but it may be modified if a non-null execution delta
+    or execution date function is passed in.
+
+    :param execution_date: The execution date of the sensor
+    :type execution_date: datetime.datetime
+    :param execution_delta: Time difference between the sensor
+        execution date and the target DAG run execution date. Positive
+        delta looks back in time.
+    :type execution_delta: Optional[datetime.timedelta]
+    :param execution_date_fn: Function to compute the execution date(s)
+        of the target DAG run to look at given the sensor's execution
+        date.
+    :type execution_date_fn: Optional[Callable]
+    :return: Execution date(s) to wait for
+    :rtype: List[datetime.datetime]
+    """
+    if execution_delta:
+        dttm = execution_date - execution_delta
+    elif execution_date_fn:
+        dttm = execution_date_fn(execution_date)
+    else:
+        dttm = execution_date
+
+    return dttm if isinstance(dttm, list) else [dttm]
+
+
+class ExternalTaskLink(BaseOperatorLink):
+    name = 'External DAG'
+
+    def get_link(self, operator, dttm):
+        possible_execution_dates = get_possible_target_execution_dates(
+            execution_date=dttm,
+            execution_delta=getattr(operator, 'execution_delta', None),
+            execution_date_fn=None,
 
 Review comment:
   For now I've opted for ensuring the link isn't added if `execution_date_fn` is provided @yuqian90 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services