You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/31 15:29:08 UTC

[GitHub] [airflow] eladkal opened a new pull request #13997: Weekday operator

eladkal opened a new pull request #13997:
URL: https://github.com/apache/airflow/pull/13997


   Add new core operator `BranchDayOfWeekOperator`.
   Inspired by [DayOfWeekSensor](https://github.com/apache/airflow/blob/master/airflow/sensors/weekday.py#L25) & [BranchPythonOperator](https://github.com/apache/airflow/blob/master/airflow/operators/python.py#L309)
   
   use case: branch workflow based on day of week.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #13997: Add BranchDayOfWeekOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #13997:
URL: https://github.com/apache/airflow/pull/13997#discussion_r568679481



##########
File path: airflow/operators/weekday.py
##########
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Iterable, Union
+
+from airflow.operators.branch import BaseBranchOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.weekday import WeekDay
+
+
+class BranchDayOfWeekOperator(BaseBranchOperator):
+    """
+    Branches into one of two lists of tasks depending on the current day.
+    For more information on how to use this operator, take a look at the guide:
+    :ref:`howto/operator:BranchDayOfWeekOperator`
+
+    :param follow_task_ids_if_true: task id or task ids to follow if criteria met
+    :type follow_task_ids_if_true: str or list[str]
+    :param follow_task_ids_if_false: task id or task ids to follow if criteria does not met
+    :type follow_task_ids_if_false: str or list[str]
+    :param week_day: Day of the week to check (full name). Optionally, a set
+        of days can also be provided using a set.
+        Example values:
+
+            * ``"MONDAY"``,
+            * ``{"Saturday", "Sunday"}``
+            * ``{WeekDay.TUESDAY}``
+            * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
+
+    :type week_day: set or str or airflow.utils.weekday.WeekDay
+    :param use_task_execution_day: If ``True``, uses task's execution day to compare
+        with is_today. Execution Date is Useful for backfilling.
+        If ``False``, uses system's day of the week.
+    :type use_task_execution_day: bool
+    """
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        follow_task_ids_if_true: Union[str, Iterable[str]],
+        follow_task_ids_if_false: Union[str, Iterable[str]],
+        week_day: Union[str, Iterable[str]],
+        use_task_execution_day: bool = False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.follow_task_ids_if_true = follow_task_ids_if_true
+        self.follow_task_ids_if_false = follow_task_ids_if_false
+        self.week_day = week_day
+        self.use_task_execution_day = use_task_execution_day
+        self._week_day_num = None

Review comment:
       aah right, in that case let's keep that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #13997: Add BranchDayOfWeekOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #13997:
URL: https://github.com/apache/airflow/pull/13997#discussion_r568680625



##########
File path: tests/operators/test_weekday.py
##########
@@ -0,0 +1,267 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import unittest
+
+import pytest
+from freezegun import freeze_time
+from parameterized import parameterized
+
+from airflow.exceptions import AirflowException
+from airflow.models import DAG, DagRun, TaskInstance as TI
+from airflow.operators.dummy import DummyOperator
+from airflow.operators.weekday import BranchDayOfWeekOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session
+from airflow.utils.state import State
+from airflow.utils.weekday import WeekDay
+
+DEFAULT_DATE = timezone.datetime(2020, 2, 5)  # Wednesday
+INTERVAL = datetime.timedelta(hours=12)
+
+
+class TestBranchDayOfWeekOperator(unittest.TestCase):
+    """
+    Tests for BranchDayOfWeekOperator
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()

Review comment:
       ```suggestion
       def setUpClass(cls):
   ```

##########
File path: tests/operators/test_weekday.py
##########
@@ -0,0 +1,267 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import unittest
+
+import pytest
+from freezegun import freeze_time
+from parameterized import parameterized
+
+from airflow.exceptions import AirflowException
+from airflow.models import DAG, DagRun, TaskInstance as TI
+from airflow.operators.dummy import DummyOperator
+from airflow.operators.weekday import BranchDayOfWeekOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session
+from airflow.utils.state import State
+from airflow.utils.weekday import WeekDay
+
+DEFAULT_DATE = timezone.datetime(2020, 2, 5)  # Wednesday
+INTERVAL = datetime.timedelta(hours=12)
+
+
+class TestBranchDayOfWeekOperator(unittest.TestCase):
+    """
+    Tests for BranchDayOfWeekOperator
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()
+
+        with create_session() as session:
+            session.query(DagRun).delete()
+            session.query(TI).delete()
+
+    def setUp(self):
+        super().setUp()

Review comment:
       ```suggestion
   ```

##########
File path: tests/operators/test_weekday.py
##########
@@ -0,0 +1,267 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import unittest
+
+import pytest
+from freezegun import freeze_time
+from parameterized import parameterized
+
+from airflow.exceptions import AirflowException
+from airflow.models import DAG, DagRun, TaskInstance as TI
+from airflow.operators.dummy import DummyOperator
+from airflow.operators.weekday import BranchDayOfWeekOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session
+from airflow.utils.state import State
+from airflow.utils.weekday import WeekDay
+
+DEFAULT_DATE = timezone.datetime(2020, 2, 5)  # Wednesday
+INTERVAL = datetime.timedelta(hours=12)
+
+
+class TestBranchDayOfWeekOperator(unittest.TestCase):
+    """
+    Tests for BranchDayOfWeekOperator
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()
+
+        with create_session() as session:
+            session.query(DagRun).delete()
+            session.query(TI).delete()
+
+    def setUp(self):
+        super().setUp()
+        self.dag = DAG(
+            "branch_day_of_week_operator_test",
+            default_args={"owner": "airflow", "start_date": DEFAULT_DATE},
+            schedule_interval=INTERVAL,
+        )
+        self.branch_1 = DummyOperator(task_id="branch_1", dag=self.dag)
+        self.branch_2 = DummyOperator(task_id="branch_2", dag=self.dag)
+        self.branch_3 = None
+
+    def tearDown(self):
+        super().tearDown()

Review comment:
       ```suggestion
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on pull request #13997: Add BranchDayOfWeekOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on pull request #13997:
URL: https://github.com/apache/airflow/pull/13997#issuecomment-772426267


   @kaxil fixed


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13997: Add BranchDayOfWeekOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13997:
URL: https://github.com/apache/airflow/pull/13997#issuecomment-770406155


   [The Workflow run](https://github.com/apache/airflow/actions/runs/525561718) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #13997: Add BranchDayOfWeekOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #13997:
URL: https://github.com/apache/airflow/pull/13997#discussion_r568675209



##########
File path: airflow/operators/weekday.py
##########
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Iterable, Union
+
+from airflow.operators.branch import BaseBranchOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.weekday import WeekDay
+
+
+class BranchDayOfWeekOperator(BaseBranchOperator):
+    """
+    Branches into one of two lists of tasks depending on the current day.
+    For more information on how to use this operator, take a look at the guide:
+    :ref:`howto/operator:BranchDayOfWeekOperator`
+
+    :param follow_task_ids_if_true: task id or task ids to follow if criteria met
+    :type follow_task_ids_if_true: str or list[str]
+    :param follow_task_ids_if_false: task id or task ids to follow if criteria does not met
+    :type follow_task_ids_if_false: str or list[str]
+    :param week_day: Day of the week to check (full name). Optionally, a set
+        of days can also be provided using a set.
+        Example values:
+
+            * ``"MONDAY"``,
+            * ``{"Saturday", "Sunday"}``
+            * ``{WeekDay.TUESDAY}``
+            * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
+
+    :type week_day: set or str or airflow.utils.weekday.WeekDay
+    :param use_task_execution_day: If ``True``, uses task's execution day to compare
+        with is_today. Execution Date is Useful for backfilling.
+        If ``False``, uses system's day of the week.
+    :type use_task_execution_day: bool
+    """
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        follow_task_ids_if_true: Union[str, Iterable[str]],
+        follow_task_ids_if_false: Union[str, Iterable[str]],
+        week_day: Union[str, Iterable[str]],
+        use_task_execution_day: bool = False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.follow_task_ids_if_true = follow_task_ids_if_true
+        self.follow_task_ids_if_false = follow_task_ids_if_false
+        self.week_day = week_day
+        self.use_task_execution_day = use_task_execution_day
+        self._week_day_num = None

Review comment:
       @kaxil Seems there is already convention for it. See BranchSQLOperator https://github.com/apache/airflow/blob/master/airflow/operators/sql.py#L494




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil merged pull request #13997: Add BranchDayOfWeekOperator

Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #13997:
URL: https://github.com/apache/airflow/pull/13997


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #13997: Add BranchDayOfWeekOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #13997:
URL: https://github.com/apache/airflow/pull/13997#discussion_r568636245



##########
File path: airflow/operators/weekday.py
##########
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Iterable, Union
+
+from airflow.operators.branch import BaseBranchOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.weekday import WeekDay
+
+
+class BranchDayOfWeekOperator(BaseBranchOperator):
+    """
+    Branches into one of two lists of tasks depending on the current day.
+    For more information on how to use this operator, take a look at the guide:
+    :ref:`howto/operator:BranchDayOfWeekOperator`
+
+    :param follow_task_ids_if_true: task id or task ids to follow if criteria met
+    :type follow_task_ids_if_true: str or list[str]
+    :param follow_task_ids_if_false: task id or task ids to follow if criteria does not met
+    :type follow_task_ids_if_false: str or list[str]
+    :param week_day: Day of the week to check (full name). Optionally, a set
+        of days can also be provided using a set.
+        Example values:
+
+            * ``"MONDAY"``,
+            * ``{"Saturday", "Sunday"}``
+            * ``{WeekDay.TUESDAY}``
+            * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
+
+    :type week_day: set or str or airflow.utils.weekday.WeekDay
+    :param use_task_execution_day: If ``True``, uses task's execution day to compare
+        with is_today. Execution Date is Useful for backfilling.
+        If ``False``, uses system's day of the week.
+    :type use_task_execution_day: bool
+    """
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        follow_task_ids_if_true: Union[str, Iterable[str]],
+        follow_task_ids_if_false: Union[str, Iterable[str]],
+        week_day: Union[str, Iterable[str]],
+        use_task_execution_day: bool = False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.follow_task_ids_if_true = follow_task_ids_if_true
+        self.follow_task_ids_if_false = follow_task_ids_if_false
+        self.week_day = week_day
+        self.use_task_execution_day = use_task_execution_day
+        self._week_day_num = None

Review comment:
       `follow_task_ids_if_true` sounds like it takes a boolean value
   
   maybe `tasks_to_follow_if_true` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13997: Add BranchDayOfWeekOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13997:
URL: https://github.com/apache/airflow/pull/13997#issuecomment-770449676


   [The Workflow run](https://github.com/apache/airflow/actions/runs/525966875) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #13997: Add BranchDayOfWeekOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #13997:
URL: https://github.com/apache/airflow/pull/13997#discussion_r568680313



##########
File path: tests/operators/test_weekday.py
##########
@@ -0,0 +1,267 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import unittest
+
+import pytest
+from freezegun import freeze_time
+from parameterized import parameterized
+
+from airflow.exceptions import AirflowException
+from airflow.models import DAG, DagRun, TaskInstance as TI
+from airflow.operators.dummy import DummyOperator
+from airflow.operators.weekday import BranchDayOfWeekOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session
+from airflow.utils.state import State
+from airflow.utils.weekday import WeekDay
+
+DEFAULT_DATE = timezone.datetime(2020, 2, 5)  # Wednesday
+INTERVAL = datetime.timedelta(hours=12)
+
+
+class TestBranchDayOfWeekOperator(unittest.TestCase):
+    """
+    Tests for BranchDayOfWeekOperator
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()
+
+        with create_session() as session:
+            session.query(DagRun).delete()
+            session.query(TI).delete()
+
+    def setUp(self):
+        super().setUp()
+        self.dag = DAG(
+            "branch_day_of_week_operator_test",
+            default_args={"owner": "airflow", "start_date": DEFAULT_DATE},
+            schedule_interval=INTERVAL,

Review comment:
       ```suggestion
               "branch_day_of_week_operator_test",
               start_date=DEFAULT_DATE,
               schedule_interval=INTERVAL,
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on a change in pull request #13997: Add BranchDayOfWeekOperator

Posted by GitBox <gi...@apache.org>.
eladkal commented on a change in pull request #13997:
URL: https://github.com/apache/airflow/pull/13997#discussion_r568675209



##########
File path: airflow/operators/weekday.py
##########
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Iterable, Union
+
+from airflow.operators.branch import BaseBranchOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.weekday import WeekDay
+
+
+class BranchDayOfWeekOperator(BaseBranchOperator):
+    """
+    Branches into one of two lists of tasks depending on the current day.
+    For more information on how to use this operator, take a look at the guide:
+    :ref:`howto/operator:BranchDayOfWeekOperator`
+
+    :param follow_task_ids_if_true: task id or task ids to follow if criteria met
+    :type follow_task_ids_if_true: str or list[str]
+    :param follow_task_ids_if_false: task id or task ids to follow if criteria does not met
+    :type follow_task_ids_if_false: str or list[str]
+    :param week_day: Day of the week to check (full name). Optionally, a set
+        of days can also be provided using a set.
+        Example values:
+
+            * ``"MONDAY"``,
+            * ``{"Saturday", "Sunday"}``
+            * ``{WeekDay.TUESDAY}``
+            * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
+
+    :type week_day: set or str or airflow.utils.weekday.WeekDay
+    :param use_task_execution_day: If ``True``, uses task's execution day to compare
+        with is_today. Execution Date is Useful for backfilling.
+        If ``False``, uses system's day of the week.
+    :type use_task_execution_day: bool
+    """
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        follow_task_ids_if_true: Union[str, Iterable[str]],
+        follow_task_ids_if_false: Union[str, Iterable[str]],
+        week_day: Union[str, Iterable[str]],
+        use_task_execution_day: bool = False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.follow_task_ids_if_true = follow_task_ids_if_true
+        self.follow_task_ids_if_false = follow_task_ids_if_false
+        self.week_day = week_day
+        self.use_task_execution_day = use_task_execution_day
+        self._week_day_num = None

Review comment:
       @kaxil Seems there is already convention for it. See BranchSQLOperator https://github.com/apache/airflow/blob/master/airflow/operators/sql.py#L494




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13997: Add BranchDayOfWeekOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13997:
URL: https://github.com/apache/airflow/pull/13997#issuecomment-770417689


   [The Workflow run](https://github.com/apache/airflow/actions/runs/525666493) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #13997: Add BranchDayOfWeekOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13997:
URL: https://github.com/apache/airflow/pull/13997#issuecomment-770430029


   [The Workflow run](https://github.com/apache/airflow/actions/runs/525784682) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #13997: Add BranchDayOfWeekOperator

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #13997:
URL: https://github.com/apache/airflow/pull/13997#discussion_r568636245



##########
File path: airflow/operators/weekday.py
##########
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Iterable, Union
+
+from airflow.operators.branch import BaseBranchOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.weekday import WeekDay
+
+
+class BranchDayOfWeekOperator(BaseBranchOperator):
+    """
+    Branches into one of two lists of tasks depending on the current day.
+    For more information on how to use this operator, take a look at the guide:
+    :ref:`howto/operator:BranchDayOfWeekOperator`
+
+    :param follow_task_ids_if_true: task id or task ids to follow if criteria met
+    :type follow_task_ids_if_true: str or list[str]
+    :param follow_task_ids_if_false: task id or task ids to follow if criteria does not met
+    :type follow_task_ids_if_false: str or list[str]
+    :param week_day: Day of the week to check (full name). Optionally, a set
+        of days can also be provided using a set.
+        Example values:
+
+            * ``"MONDAY"``,
+            * ``{"Saturday", "Sunday"}``
+            * ``{WeekDay.TUESDAY}``
+            * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
+
+    :type week_day: set or str or airflow.utils.weekday.WeekDay
+    :param use_task_execution_day: If ``True``, uses task's execution day to compare
+        with is_today. Execution Date is Useful for backfilling.
+        If ``False``, uses system's day of the week.
+    :type use_task_execution_day: bool
+    """
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        follow_task_ids_if_true: Union[str, Iterable[str]],
+        follow_task_ids_if_false: Union[str, Iterable[str]],
+        week_day: Union[str, Iterable[str]],
+        use_task_execution_day: bool = False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.follow_task_ids_if_true = follow_task_ids_if_true
+        self.follow_task_ids_if_false = follow_task_ids_if_false
+        self.week_day = week_day
+        self.use_task_execution_day = use_task_execution_day
+        self._week_day_num = None

Review comment:
       `follow_task_ids_if_true` sounds like it takes a boolean value
   
   maybe `tasks_to_follow_if_true` ?

##########
File path: airflow/operators/weekday.py
##########
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Dict, Iterable, Union
+
+from airflow.operators.branch import BaseBranchOperator
+from airflow.utils import timezone
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.weekday import WeekDay
+
+
+class BranchDayOfWeekOperator(BaseBranchOperator):
+    """
+    Branches into one of two lists of tasks depending on the current day.
+    For more information on how to use this operator, take a look at the guide:
+    :ref:`howto/operator:BranchDayOfWeekOperator`
+
+    :param follow_task_ids_if_true: task id or task ids to follow if criteria met
+    :type follow_task_ids_if_true: str or list[str]
+    :param follow_task_ids_if_false: task id or task ids to follow if criteria does not met
+    :type follow_task_ids_if_false: str or list[str]
+    :param week_day: Day of the week to check (full name). Optionally, a set
+        of days can also be provided using a set.
+        Example values:
+
+            * ``"MONDAY"``,
+            * ``{"Saturday", "Sunday"}``
+            * ``{WeekDay.TUESDAY}``
+            * ``{WeekDay.SATURDAY, WeekDay.SUNDAY}``
+
+    :type week_day: set or str or airflow.utils.weekday.WeekDay
+    :param use_task_execution_day: If ``True``, uses task's execution day to compare
+        with is_today. Execution Date is Useful for backfilling.
+        If ``False``, uses system's day of the week.
+    :type use_task_execution_day: bool
+    """
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        follow_task_ids_if_true: Union[str, Iterable[str]],
+        follow_task_ids_if_false: Union[str, Iterable[str]],
+        week_day: Union[str, Iterable[str]],
+        use_task_execution_day: bool = False,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.follow_task_ids_if_true = follow_task_ids_if_true
+        self.follow_task_ids_if_false = follow_task_ids_if_false
+        self.week_day = week_day
+        self.use_task_execution_day = use_task_execution_day
+        self._week_day_num = None

Review comment:
       aah right, in that case let's keep that

##########
File path: tests/operators/test_weekday.py
##########
@@ -0,0 +1,267 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import unittest
+
+import pytest
+from freezegun import freeze_time
+from parameterized import parameterized
+
+from airflow.exceptions import AirflowException
+from airflow.models import DAG, DagRun, TaskInstance as TI
+from airflow.operators.dummy import DummyOperator
+from airflow.operators.weekday import BranchDayOfWeekOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session
+from airflow.utils.state import State
+from airflow.utils.weekday import WeekDay
+
+DEFAULT_DATE = timezone.datetime(2020, 2, 5)  # Wednesday
+INTERVAL = datetime.timedelta(hours=12)
+
+
+class TestBranchDayOfWeekOperator(unittest.TestCase):
+    """
+    Tests for BranchDayOfWeekOperator
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()
+
+        with create_session() as session:
+            session.query(DagRun).delete()
+            session.query(TI).delete()
+
+    def setUp(self):
+        super().setUp()
+        self.dag = DAG(
+            "branch_day_of_week_operator_test",
+            default_args={"owner": "airflow", "start_date": DEFAULT_DATE},
+            schedule_interval=INTERVAL,

Review comment:
       ```suggestion
               "branch_day_of_week_operator_test",
               start_date=DEFAULT_DATE,
               schedule_interval=INTERVAL,
   ```

##########
File path: tests/operators/test_weekday.py
##########
@@ -0,0 +1,267 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import unittest
+
+import pytest
+from freezegun import freeze_time
+from parameterized import parameterized
+
+from airflow.exceptions import AirflowException
+from airflow.models import DAG, DagRun, TaskInstance as TI
+from airflow.operators.dummy import DummyOperator
+from airflow.operators.weekday import BranchDayOfWeekOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session
+from airflow.utils.state import State
+from airflow.utils.weekday import WeekDay
+
+DEFAULT_DATE = timezone.datetime(2020, 2, 5)  # Wednesday
+INTERVAL = datetime.timedelta(hours=12)
+
+
+class TestBranchDayOfWeekOperator(unittest.TestCase):
+    """
+    Tests for BranchDayOfWeekOperator
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()

Review comment:
       ```suggestion
       def setUpClass(cls):
   ```

##########
File path: tests/operators/test_weekday.py
##########
@@ -0,0 +1,267 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import unittest
+
+import pytest
+from freezegun import freeze_time
+from parameterized import parameterized
+
+from airflow.exceptions import AirflowException
+from airflow.models import DAG, DagRun, TaskInstance as TI
+from airflow.operators.dummy import DummyOperator
+from airflow.operators.weekday import BranchDayOfWeekOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session
+from airflow.utils.state import State
+from airflow.utils.weekday import WeekDay
+
+DEFAULT_DATE = timezone.datetime(2020, 2, 5)  # Wednesday
+INTERVAL = datetime.timedelta(hours=12)
+
+
+class TestBranchDayOfWeekOperator(unittest.TestCase):
+    """
+    Tests for BranchDayOfWeekOperator
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()
+
+        with create_session() as session:
+            session.query(DagRun).delete()
+            session.query(TI).delete()
+
+    def setUp(self):
+        super().setUp()

Review comment:
       ```suggestion
   ```

##########
File path: tests/operators/test_weekday.py
##########
@@ -0,0 +1,267 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import unittest
+
+import pytest
+from freezegun import freeze_time
+from parameterized import parameterized
+
+from airflow.exceptions import AirflowException
+from airflow.models import DAG, DagRun, TaskInstance as TI
+from airflow.operators.dummy import DummyOperator
+from airflow.operators.weekday import BranchDayOfWeekOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session
+from airflow.utils.state import State
+from airflow.utils.weekday import WeekDay
+
+DEFAULT_DATE = timezone.datetime(2020, 2, 5)  # Wednesday
+INTERVAL = datetime.timedelta(hours=12)
+
+
+class TestBranchDayOfWeekOperator(unittest.TestCase):
+    """
+    Tests for BranchDayOfWeekOperator
+    """
+
+    @classmethod
+    def setUpClass(cls):
+        super().setUpClass()
+
+        with create_session() as session:
+            session.query(DagRun).delete()
+            session.query(TI).delete()
+
+    def setUp(self):
+        super().setUp()
+        self.dag = DAG(
+            "branch_day_of_week_operator_test",
+            default_args={"owner": "airflow", "start_date": DEFAULT_DATE},
+            schedule_interval=INTERVAL,
+        )
+        self.branch_1 = DummyOperator(task_id="branch_1", dag=self.dag)
+        self.branch_2 = DummyOperator(task_id="branch_2", dag=self.dag)
+        self.branch_3 = None
+
+    def tearDown(self):
+        super().tearDown()

Review comment:
       ```suggestion
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org