You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/10/06 23:16:19 UTC

[airflow] branch master updated: Move latest_only_operator.py to latest_only.py (#11178) (#11304)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new bbc3cea  Move latest_only_operator.py to latest_only.py (#11178) (#11304)
bbc3cea is described below

commit bbc3cea057dc4a535c79856026ca95739853f90c
Author: Kishore Vancheeshwaran <24...@users.noreply.github.com>
AuthorDate: Wed Oct 7 04:45:28 2020 +0530

    Move latest_only_operator.py to latest_only.py (#11178) (#11304)
---
 airflow/example_dags/example_latest_only.py        |  2 +-
 .../example_latest_only_with_trigger.py            |  2 +-
 .../{latest_only_operator.py => latest_only.py}    |  0
 airflow/operators/latest_only_operator.py          | 54 ++++------------------
 docs/operators-and-hooks-ref.rst                   |  2 +-
 tests/deprecated_classes.py                        |  4 ++
 tests/operators/test_latest_only_operator.py       |  2 +-
 7 files changed, 16 insertions(+), 50 deletions(-)

diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py
index 1309d53..d01df73 100644
--- a/airflow/example_dags/example_latest_only.py
+++ b/airflow/example_dags/example_latest_only.py
@@ -22,7 +22,7 @@ import datetime as dt
 
 from airflow import DAG
 from airflow.operators.dummy_operator import DummyOperator
-from airflow.operators.latest_only_operator import LatestOnlyOperator
+from airflow.operators.latest_only import LatestOnlyOperator
 from airflow.utils.dates import days_ago
 
 dag = DAG(
diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py
index f662c60..c7cb2e9 100644
--- a/airflow/example_dags/example_latest_only_with_trigger.py
+++ b/airflow/example_dags/example_latest_only_with_trigger.py
@@ -24,7 +24,7 @@ import datetime as dt
 
 from airflow import DAG
 from airflow.operators.dummy_operator import DummyOperator
-from airflow.operators.latest_only_operator import LatestOnlyOperator
+from airflow.operators.latest_only import LatestOnlyOperator
 from airflow.utils.dates import days_ago
 from airflow.utils.trigger_rule import TriggerRule
 
diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only.py
similarity index 100%
copy from airflow/operators/latest_only_operator.py
copy to airflow/operators/latest_only.py
diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only_operator.py
index d19e1b0..7096aab 100644
--- a/airflow/operators/latest_only_operator.py
+++ b/airflow/operators/latest_only_operator.py
@@ -16,52 +16,14 @@
 # specific language governing permissions and limitations
 # under the License.
 """
-This module contains an operator to run downstream tasks only for the
-latest scheduled DagRun
+This module is deprecated. Please use `airflow.operators.latest_only`.
 """
-from typing import Dict, Iterable, Union
+import warnings
 
-import pendulum
+# pylint: disable=unused-import
+from airflow.operators.latest_only import LatestOnlyOperator  # noqa
 
-from airflow.operators.branch_operator import BaseBranchOperator
-
-
-class LatestOnlyOperator(BaseBranchOperator):
-    """
-    Allows a workflow to skip tasks that are not running during the most
-    recent schedule interval.
-
-    If the task is run outside of the latest schedule interval (i.e. external_trigger),
-    all directly downstream tasks will be skipped.
-
-    Note that downstream tasks are never skipped if the given DAG_Run is
-    marked as externally triggered.
-    """
-
-    ui_color = '#e9ffdb'  # nyanza
-
-    def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]:
-        # If the DAG Run is externally triggered, then return without
-        # skipping downstream tasks
-        if context['dag_run'] and context['dag_run'].external_trigger:
-            self.log.info(
-                "Externally triggered DAG_Run: allowing execution to proceed.")
-            return list(context['task'].get_direct_relative_ids(upstream=False))
-
-        now = pendulum.now('UTC')
-        left_window = context['dag'].following_schedule(
-            context['execution_date'])
-        right_window = context['dag'].following_schedule(left_window)
-        self.log.info(
-            'Checking latest only with left_window: %s right_window: %s now: %s',
-            left_window, right_window, now
-        )
-
-        if not left_window < now <= right_window:
-            self.log.info('Not latest execution, skipping downstream.')
-            # we return an empty list, thus the parent BaseBranchOperator
-            # won't exclude any downstream tasks from skipping.
-            return []
-        else:
-            self.log.info('Latest, allowing execution to proceed.')
-            return list(context['task'].get_direct_relative_ids(upstream=False))
+warnings.warn(
+    "This module is deprecated. Please use `airflow.operators.latest_only`.",
+    DeprecationWarning, stacklevel=2
+)
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index c107160..1265600 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -73,7 +73,7 @@ Fundamentals
    * - :mod:`airflow.operators.generic_transfer`
      -
 
-   * - :mod:`airflow.operators.latest_only_operator`
+   * - :mod:`airflow.operators.latest_only`
      -
 
    * - :mod:`airflow.operators.python`
diff --git a/tests/deprecated_classes.py b/tests/deprecated_classes.py
index 2d53951..855239e 100644
--- a/tests/deprecated_classes.py
+++ b/tests/deprecated_classes.py
@@ -1339,6 +1339,10 @@ OPERATORS = [
         "airflow.providers.google.cloud.operators.text_to_speech.CloudTextToSpeechSynthesizeOperator",
         "airflow.contrib.operators.gcp_text_to_speech_operator.GcpTextToSpeechSynthesizeOperator",
     ),
+    (
+        "airflow.operators.latest_only.LatestOnlyOperator",
+        "airflow.operators.latest_only_operator.LatestOnlyOperator",
+    ),
 ]
 
 SECRETS = [
diff --git a/tests/operators/test_latest_only_operator.py b/tests/operators/test_latest_only_operator.py
index 935c199..b398756 100644
--- a/tests/operators/test_latest_only_operator.py
+++ b/tests/operators/test_latest_only_operator.py
@@ -25,7 +25,7 @@ from airflow import settings
 from airflow.models import DagRun, TaskInstance
 from airflow.models.dag import DAG
 from airflow.operators.dummy_operator import DummyOperator
-from airflow.operators.latest_only_operator import LatestOnlyOperator
+from airflow.operators.latest_only import LatestOnlyOperator
 from airflow.utils import timezone
 from airflow.utils.session import create_session
 from airflow.utils.state import State