You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by el...@apache.org on 2021/03/13 18:25:41 UTC
[airflow] branch master updated: Rename DateTimeBranchOperator to
BranchDateTimeOperator (#14720)
This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 72d3708 Rename DateTimeBranchOperator to BranchDateTimeOperator (#14720)
72d3708 is described below
commit 72d37080951caa1e91df188277c045dde3500d58
Author: Tomás Farías Santana <to...@tomasfarias.dev>
AuthorDate: Sat Mar 13 19:25:26 2021 +0100
Rename DateTimeBranchOperator to BranchDateTimeOperator (#14720)
* Update names and paths to be consistent with other Branch operators
This included renaming the operator to match the '^Branch.*' pattern, like other operators do, as well as updating file and import paths to remove deprecated files (for example, changing the deprecated dummy_operator to dummy).
* Add missing reference to operator guide
---
...ator.py => example_branch_datetime_operator.py} | 20 +++++------
.../operators/{datetime_branch.py => datetime.py} | 8 +++--
.../operator/{datetime_branch.rst => datetime.rst} | 18 +++++-----
docs/apache-airflow/howto/operator/index.rst | 2 +-
.../{test_datetime_branch.py => test_datetime.py} | 42 +++++++++++-----------
5 files changed, 46 insertions(+), 44 deletions(-)
diff --git a/airflow/example_dags/example_datetime_branch_operator.py b/airflow/example_dags/example_branch_datetime_operator.py
similarity index 83%
rename from airflow/example_dags/example_datetime_branch_operator.py
rename to airflow/example_dags/example_branch_datetime_operator.py
index 58b1a0a..c0ddeef 100644
--- a/airflow/example_dags/example_datetime_branch_operator.py
+++ b/airflow/example_dags/example_branch_datetime_operator.py
@@ -23,8 +23,8 @@ targets.
import datetime
from airflow import DAG
-from airflow.operators.datetime_branch import DateTimeBranchOperator
-from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.datetime import BranchDateTimeOperator
+from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
args = {
@@ -32,18 +32,18 @@ args = {
}
dag = DAG(
- dag_id="example_datetime_branch_operator",
+ dag_id="example_branch_datetime_operator",
start_date=days_ago(2),
default_args=args,
tags=["example"],
schedule_interval="@daily",
)
-# [START howto_operator_datetime_branch]
+# [START howto_branch_datetime_operator]
dummy_task_1 = DummyOperator(task_id='date_in_range', dag=dag)
dummy_task_2 = DummyOperator(task_id='date_outside_range', dag=dag)
-cond1 = DateTimeBranchOperator(
+cond1 = BranchDateTimeOperator(
task_id='datetime_branch',
follow_task_ids_if_true=['date_in_range'],
follow_task_ids_if_false=['date_outside_range'],
@@ -54,21 +54,21 @@ cond1 = DateTimeBranchOperator(
# Run dummy_task_1 if cond1 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
cond1 >> [dummy_task_1, dummy_task_2]
-# [END howto_operator_datetime_branch]
+# [END howto_branch_datetime_operator]
dag = DAG(
- dag_id="example_datetime_branch_operator_2",
+ dag_id="example_branch_datetime_operator_2",
start_date=days_ago(2),
default_args=args,
tags=["example"],
schedule_interval="@daily",
)
-# [START howto_operator_datetime_branch_next_day]
+# [START howto_branch_datetime_operator_next_day]
dummy_task_1 = DummyOperator(task_id='date_in_range', dag=dag)
dummy_task_2 = DummyOperator(task_id='date_outside_range', dag=dag)
-cond2 = DateTimeBranchOperator(
+cond2 = BranchDateTimeOperator(
task_id='datetime_branch',
follow_task_ids_if_true=['date_in_range'],
follow_task_ids_if_false=['date_outside_range'],
@@ -80,4 +80,4 @@ cond2 = DateTimeBranchOperator(
# Since target_lower happens after target_upper, target_upper will be moved to the following day
# Run dummy_task_1 if cond2 executes between 15:00:00, and 00:00:00 of the following day
cond2 >> [dummy_task_1, dummy_task_2]
-# [END howto_operator_datetime_branch_next_day]
+# [END howto_branch_datetime_operator_next_day]
diff --git a/airflow/operators/datetime_branch.py b/airflow/operators/datetime.py
similarity index 93%
rename from airflow/operators/datetime_branch.py
rename to airflow/operators/datetime.py
index 868bd23..63aea91 100644
--- a/airflow/operators/datetime_branch.py
+++ b/airflow/operators/datetime.py
@@ -19,16 +19,18 @@ import datetime
from typing import Dict, Iterable, Union
from airflow.exceptions import AirflowException
-from airflow.operators.branch_operator import BaseBranchOperator
+from airflow.operators.branch import BaseBranchOperator
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults
-class DateTimeBranchOperator(BaseBranchOperator):
+class BranchDateTimeOperator(BaseBranchOperator):
"""
Branches into one of two lists of tasks depending on the current datetime.
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:BranchDateTimeOperator`
- True branch will be returned when `datetime.datetime.now()` falls below
+ True branch will be returned when ``datetime.datetime.now()`` falls below
``target_upper`` and above ``target_lower``.
:param follow_task_ids_if_true: task id or task ids to follow if
diff --git a/docs/apache-airflow/howto/operator/datetime_branch.rst b/docs/apache-airflow/howto/operator/datetime.rst
similarity index 69%
rename from docs/apache-airflow/howto/operator/datetime_branch.rst
rename to docs/apache-airflow/howto/operator/datetime.rst
index b798468..37f18b5 100644
--- a/docs/apache-airflow/howto/operator/datetime_branch.rst
+++ b/docs/apache-airflow/howto/operator/datetime.rst
@@ -17,23 +17,23 @@
-.. _howto/operator:DatetimeBranch:
+.. _howto/operator:BranchDateTimeOperator:
-DatetimeBranchOperator
+BranchDateTimeOperator
======================
-Use the :class:`~airflow.operators.datetime_branch.DatetimeBranchOperator` to branch into one of two execution paths depending on whether the date and/or time of execution falls into the range given by two target arguments.
+Use the :class:`~airflow.operators.datetime.BranchDateTimeOperator` to branch into one of two execution paths depending on whether the date and/or time of execution falls into the range given by two target arguments.
-.. exampleinclude:: /../../airflow/example_dags/example_datetime_branch_operator.py
+.. exampleinclude:: /../../airflow/example_dags/example_branch_datetime_operator.py
:language: python
- :start-after: [START howto_operator_datetime_branch]
- :end-before: [END howto_operator_datetime_branch]
+ :start-after: [START howto_branch_datetime_operator]
+ :end-before: [END howto_branch_datetime_operator]
The target parameters, ``target_upper`` and ``target_lower``, can receive a ``datetime.datetime``, a ``datetime.time``, or ``None``. When a ``datetime.time`` object is used, it will be combined with the current date in order to allow comparisons with it. In the event that ``target_upper`` is set to a ``datetime.time`` that occurs before the given ``target_lower``, a day will be added to ``target_upper``. This is done to allow for time periods that span over two dates.
-.. exampleinclude:: /../../airflow/example_dags/example_datetime_branch_operator.py
+.. exampleinclude:: /../../airflow/example_dags/example_branch_datetime_operator.py
:language: python
- :start-after: [START howto_operator_datetime_branch_next_day]
- :end-before: [END howto_operator_datetime_branch_next_day]
+ :start-after: [START howto_branch_datetime_operator_next_day]
+ :end-before: [END howto_branch_datetime_operator_next_day]
If a target parameter is set to ``None``, the operator will perform a unilateral comparison using only the non-``None`` target. Setting both ``target_upper`` and ``target_lower`` to ``None`` will raise an exception.
diff --git a/docs/apache-airflow/howto/operator/index.rst b/docs/apache-airflow/howto/operator/index.rst
index 71cdf66..062e1fe 100644
--- a/docs/apache-airflow/howto/operator/index.rst
+++ b/docs/apache-airflow/howto/operator/index.rst
@@ -32,7 +32,7 @@ determine what actually executes when your DAG runs.
:maxdepth: 2
bash
- datetime_branch
+ datetime
python
weekday
external_task_sensor
diff --git a/tests/operators/test_datetime_branch.py b/tests/operators/test_datetime.py
similarity index 87%
rename from tests/operators/test_datetime_branch.py
rename to tests/operators/test_datetime.py
index cfc2a86..ddfd83a 100644
--- a/tests/operators/test_datetime_branch.py
+++ b/tests/operators/test_datetime.py
@@ -23,8 +23,8 @@ import freezegun
from airflow.exceptions import AirflowException
from airflow.models import DAG, DagRun, TaskInstance as TI
-from airflow.operators.datetime_branch import DateTimeBranchOperator
-from airflow.operators.dummy_operator import DummyOperator
+from airflow.operators.datetime import BranchDateTimeOperator
+from airflow.operators.dummy import DummyOperator
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import State
@@ -33,7 +33,7 @@ DEFAULT_DATE = timezone.datetime(2016, 1, 1)
INTERVAL = datetime.timedelta(hours=12)
-class TestDateTimeBranchOperator(unittest.TestCase):
+class TestBranchDateTimeOperator(unittest.TestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
@@ -52,7 +52,7 @@ class TestDateTimeBranchOperator(unittest.TestCase):
def setUp(self):
self.dag = DAG(
- 'datetime_branch_operator_test',
+ 'branch_datetime_operator_test',
default_args={'owner': 'airflow', 'start_date': DEFAULT_DATE},
schedule_interval=INTERVAL,
)
@@ -60,7 +60,7 @@ class TestDateTimeBranchOperator(unittest.TestCase):
self.branch_1 = DummyOperator(task_id='branch_1', dag=self.dag)
self.branch_2 = DummyOperator(task_id='branch_2', dag=self.dag)
- self.branch_op = DateTimeBranchOperator(
+ self.branch_op = BranchDateTimeOperator(
task_id='datetime_branch',
follow_task_ids_if_true='branch_1',
follow_task_ids_if_false='branch_2',
@@ -100,9 +100,9 @@ class TestDateTimeBranchOperator(unittest.TestCase):
)
def test_no_target_time(self):
- """Check if DateTimeBranchOperator raises exception on missing target"""
+ """Check if BranchDateTimeOperator raises exception on missing target"""
with self.assertRaises(AirflowException):
- DateTimeBranchOperator(
+ BranchDateTimeOperator(
task_id='datetime_branch',
follow_task_ids_if_true='branch_1',
follow_task_ids_if_false='branch_2',
@@ -112,8 +112,8 @@ class TestDateTimeBranchOperator(unittest.TestCase):
)
@freezegun.freeze_time("2020-07-07 10:54:05")
- def test_datetime_branch_operator_falls_within_range(self):
- """Check DateTimeBranchOperator branch operation"""
+ def test_branch_datetime_operator_falls_within_range(self):
+ """Check BranchDateTimeOperator branch operation"""
for target_lower, target_upper in self.targets:
with self.subTest(target_lower=target_lower, target_upper=target_upper):
self.branch_op.target_lower = target_lower
@@ -128,8 +128,8 @@ class TestDateTimeBranchOperator(unittest.TestCase):
}
)
- def test_datetime_branch_operator_falls_outside_range(self):
- """Check DateTimeBranchOperator branch operation"""
+ def test_branch_datetime_operator_falls_outside_range(self):
+ """Check BranchDateTimeOperator branch operation"""
dates = [
datetime.datetime(2020, 7, 7, 12, 0, 0, tzinfo=datetime.timezone.utc),
datetime.datetime(2020, 6, 7, 12, 0, 0, tzinfo=datetime.timezone.utc),
@@ -153,8 +153,8 @@ class TestDateTimeBranchOperator(unittest.TestCase):
)
@freezegun.freeze_time("2020-07-07 10:54:05")
- def test_datetime_branch_operator_upper_comparison_within_range(self):
- """Check DateTimeBranchOperator branch operation"""
+ def test_branch_datetime_operator_upper_comparison_within_range(self):
+ """Check BranchDateTimeOperator branch operation"""
for _, target_upper in self.targets:
with self.subTest(target_upper=target_upper):
self.branch_op.target_upper = target_upper
@@ -171,8 +171,8 @@ class TestDateTimeBranchOperator(unittest.TestCase):
)
@freezegun.freeze_time("2020-07-07 10:54:05")
- def test_datetime_branch_operator_lower_comparison_within_range(self):
- """Check DateTimeBranchOperator branch operation"""
+ def test_branch_datetime_operator_lower_comparison_within_range(self):
+ """Check BranchDateTimeOperator branch operation"""
for target_lower, _ in self.targets:
with self.subTest(target_lower=target_lower):
self.branch_op.target_lower = target_lower
@@ -189,8 +189,8 @@ class TestDateTimeBranchOperator(unittest.TestCase):
)
@freezegun.freeze_time("2020-07-07 12:00:00")
- def test_datetime_branch_operator_upper_comparison_outside_range(self):
- """Check DateTimeBranchOperator branch operation"""
+ def test_branch_datetime_operator_upper_comparison_outside_range(self):
+ """Check BranchDateTimeOperator branch operation"""
for _, target_upper in self.targets:
with self.subTest(target_upper=target_upper):
self.branch_op.target_upper = target_upper
@@ -207,8 +207,8 @@ class TestDateTimeBranchOperator(unittest.TestCase):
)
@freezegun.freeze_time("2020-07-07 09:00:00")
- def test_datetime_branch_operator_lower_comparison_outside_range(self):
- """Check DateTimeBranchOperator branch operation"""
+ def test_branch_datetime_operator_lower_comparison_outside_range(self):
+ """Check BranchDateTimeOperator branch operation"""
for target_lower, _ in self.targets:
with self.subTest(target_lower=target_lower):
self.branch_op.target_lower = target_lower
@@ -225,8 +225,8 @@ class TestDateTimeBranchOperator(unittest.TestCase):
)
@freezegun.freeze_time("2020-12-01 09:00:00")
- def test_datetime_branch_operator_use_task_execution_date(self):
- """Check if DateTimeBranchOperator uses task execution date"""
+ def test_branch_datetime_operator_use_task_execution_date(self):
+ """Check if BranchDateTimeOperator uses task execution date"""
in_between_date = timezone.datetime(2020, 7, 7, 10, 30, 0)
self.branch_op.use_task_execution_date = True
self.dr = self.dag.create_dagrun(