You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/10/06 23:16:19 UTC
[airflow] branch master updated: Move latest_only_operator.py to
latest_only.py (#11178) (#11304)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new bbc3cea Move latest_only_operator.py to latest_only.py (#11178) (#11304)
bbc3cea is described below
commit bbc3cea057dc4a535c79856026ca95739853f90c
Author: Kishore Vancheeshwaran <24...@users.noreply.github.com>
AuthorDate: Wed Oct 7 04:45:28 2020 +0530
Move latest_only_operator.py to latest_only.py (#11178) (#11304)
---
airflow/example_dags/example_latest_only.py | 2 +-
.../example_latest_only_with_trigger.py | 2 +-
.../{latest_only_operator.py => latest_only.py} | 0
airflow/operators/latest_only_operator.py | 54 ++++------------------
docs/operators-and-hooks-ref.rst | 2 +-
tests/deprecated_classes.py | 4 ++
tests/operators/test_latest_only_operator.py | 2 +-
7 files changed, 16 insertions(+), 50 deletions(-)
diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py
index 1309d53..d01df73 100644
--- a/airflow/example_dags/example_latest_only.py
+++ b/airflow/example_dags/example_latest_only.py
@@ -22,7 +22,7 @@ import datetime as dt
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
-from airflow.operators.latest_only_operator import LatestOnlyOperator
+from airflow.operators.latest_only import LatestOnlyOperator
from airflow.utils.dates import days_ago
dag = DAG(
diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py
index f662c60..c7cb2e9 100644
--- a/airflow/example_dags/example_latest_only_with_trigger.py
+++ b/airflow/example_dags/example_latest_only_with_trigger.py
@@ -24,7 +24,7 @@ import datetime as dt
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
-from airflow.operators.latest_only_operator import LatestOnlyOperator
+from airflow.operators.latest_only import LatestOnlyOperator
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule
diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only.py
similarity index 100%
copy from airflow/operators/latest_only_operator.py
copy to airflow/operators/latest_only.py
diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only_operator.py
index d19e1b0..7096aab 100644
--- a/airflow/operators/latest_only_operator.py
+++ b/airflow/operators/latest_only_operator.py
@@ -16,52 +16,14 @@
# specific language governing permissions and limitations
# under the License.
"""
-This module contains an operator to run downstream tasks only for the
-latest scheduled DagRun
+This module is deprecated. Please use `airflow.operators.latest_only`.
"""
-from typing import Dict, Iterable, Union
+import warnings
-import pendulum
+# pylint: disable=unused-import
+from airflow.operators.latest_only import LatestOnlyOperator # noqa
-from airflow.operators.branch_operator import BaseBranchOperator
-
-
-class LatestOnlyOperator(BaseBranchOperator):
- """
- Allows a workflow to skip tasks that are not running during the most
- recent schedule interval.
-
- If the task is run outside of the latest schedule interval (i.e. external_trigger),
- all directly downstream tasks will be skipped.
-
- Note that downstream tasks are never skipped if the given DAG_Run is
- marked as externally triggered.
- """
-
- ui_color = '#e9ffdb' # nyanza
-
- def choose_branch(self, context: Dict) -> Union[str, Iterable[str]]:
- # If the DAG Run is externally triggered, then return without
- # skipping downstream tasks
- if context['dag_run'] and context['dag_run'].external_trigger:
- self.log.info(
- "Externally triggered DAG_Run: allowing execution to proceed.")
- return list(context['task'].get_direct_relative_ids(upstream=False))
-
- now = pendulum.now('UTC')
- left_window = context['dag'].following_schedule(
- context['execution_date'])
- right_window = context['dag'].following_schedule(left_window)
- self.log.info(
- 'Checking latest only with left_window: %s right_window: %s now: %s',
- left_window, right_window, now
- )
-
- if not left_window < now <= right_window:
- self.log.info('Not latest execution, skipping downstream.')
- # we return an empty list, thus the parent BaseBranchOperator
- # won't exclude any downstream tasks from skipping.
- return []
- else:
- self.log.info('Latest, allowing execution to proceed.')
- return list(context['task'].get_direct_relative_ids(upstream=False))
+warnings.warn(
+ "This module is deprecated. Please use `airflow.operators.latest_only`.",
+ DeprecationWarning, stacklevel=2
+)
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index c107160..1265600 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -73,7 +73,7 @@ Fundamentals
* - :mod:`airflow.operators.generic_transfer`
-
- * - :mod:`airflow.operators.latest_only_operator`
+ * - :mod:`airflow.operators.latest_only`
-
* - :mod:`airflow.operators.python`
diff --git a/tests/deprecated_classes.py b/tests/deprecated_classes.py
index 2d53951..855239e 100644
--- a/tests/deprecated_classes.py
+++ b/tests/deprecated_classes.py
@@ -1339,6 +1339,10 @@ OPERATORS = [
"airflow.providers.google.cloud.operators.text_to_speech.CloudTextToSpeechSynthesizeOperator",
"airflow.contrib.operators.gcp_text_to_speech_operator.GcpTextToSpeechSynthesizeOperator",
),
+ (
+ "airflow.operators.latest_only.LatestOnlyOperator",
+ "airflow.operators.latest_only_operator.LatestOnlyOperator",
+ ),
]
SECRETS = [
diff --git a/tests/operators/test_latest_only_operator.py b/tests/operators/test_latest_only_operator.py
index 935c199..b398756 100644
--- a/tests/operators/test_latest_only_operator.py
+++ b/tests/operators/test_latest_only_operator.py
@@ -25,7 +25,7 @@ from airflow import settings
from airflow.models import DagRun, TaskInstance
from airflow.models.dag import DAG
from airflow.operators.dummy_operator import DummyOperator
-from airflow.operators.latest_only_operator import LatestOnlyOperator
+from airflow.operators.latest_only import LatestOnlyOperator
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State