You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/07/05 14:47:24 UTC

[airflow] 05/16: Drop Python 3.6 compatibility objects/modules (#24048)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 46d26c76af43f0e5acfe299176c906414f69bcbd
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Wed Jun 1 08:15:56 2022 +0300

    Drop Python 3.6 compatibility objects/modules (#24048)
    
    (cherry picked from commit 1dccaad46b901189c1928cef8419f1ea1160d550)
---
 airflow/compat/asyncio.py           | 28 ----------------------------
 airflow/jobs/triggerer_job.py       |  5 ++---
 airflow/models/dag.py               |  6 +++---
 airflow/typing_compat.py            |  9 ---------
 airflow/utils/log/secrets_masker.py |  4 +---
 tests/triggers/test_temporal.py     |  5 ++---
 6 files changed, 8 insertions(+), 49 deletions(-)

diff --git a/airflow/compat/asyncio.py b/airflow/compat/asyncio.py
deleted file mode 100644
index a999ce604d..0000000000
--- a/airflow/compat/asyncio.py
+++ /dev/null
@@ -1,28 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-try:
-    from asyncio import create_task
-except ImportError:
-    # create_task is not present in Python 3.6. Once Airflow is at 3.7+, we can
-    # remove this helper.
-    def create_task(*args, **kwargs):  # type: ignore
-        """A version of create_task that always errors."""
-        raise RuntimeError("Airflow's async functionality is only available on Python 3.7+")
-
-
-__all__ = ["create_task"]
diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py
index 2c0a0bd5bf..ac7d22a6b1 100644
--- a/airflow/jobs/triggerer_job.py
+++ b/airflow/jobs/triggerer_job.py
@@ -26,7 +26,6 @@ from typing import Deque, Dict, Set, Tuple, Type
 
 from sqlalchemy import func
 
-from airflow.compat.asyncio import create_task
 from airflow.configuration import conf
 from airflow.jobs.base_job import BaseJob
 from airflow.models.trigger import Trigger
@@ -236,7 +235,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
         The loop in here runs trigger addition/deletion/cleanup. Actual
         triggers run in their own separate coroutines.
         """
-        watchdog = create_task(self.block_watchdog())
+        watchdog = asyncio.create_task(self.block_watchdog())
         last_status = time.time()
         while not self.stop:
             # Run core logic
@@ -263,7 +262,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
             trigger_id, trigger_instance = self.to_create.popleft()
             if trigger_id not in self.triggers:
                 self.triggers[trigger_id] = {
-                    "task": create_task(self.run_trigger(trigger_id, trigger_instance)),
+                    "task": asyncio.create_task(self.run_trigger(trigger_id, trigger_instance)),
                     "name": f"{trigger_instance!r} (ID {trigger_id})",
                     "events": 0,
                 }
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 823287dcb1..4370f36c3a 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -78,7 +78,7 @@ from airflow.stats import Stats
 from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
 from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable
 from airflow.timetables.simple import NullTimetable, OnceTimetable
-from airflow.typing_compat import Literal, RePatternType
+from airflow.typing_compat import Literal
 from airflow.utils import timezone
 from airflow.utils.dag_cycle_tester import check_cycle
 from airflow.utils.dates import cron_presets, date_range as utils_date_range
@@ -1998,7 +1998,7 @@ class DAG(LoggingMixin):
 
     def partial_subset(
         self,
-        task_ids_or_regex: Union[str, RePatternType, Iterable[str]],
+        task_ids_or_regex: Union[str, re.Pattern, Iterable[str]],
         include_downstream=False,
         include_upstream=True,
         include_direct_upstream=False,
@@ -2026,7 +2026,7 @@ class DAG(LoggingMixin):
         memo = {id(self.task_dict): None, id(self._task_group): None}
         dag = copy.deepcopy(self, memo)  # type: ignore
 
-        if isinstance(task_ids_or_regex, (str, RePatternType)):
+        if isinstance(task_ids_or_regex, (str, re.Pattern)):
             matched_tasks = [t for t in self.tasks if re.findall(task_ids_or_regex, t.task_id)]
         else:
             matched_tasks = [t for t in self.tasks if t.task_id in task_ids_or_regex]
diff --git a/airflow/typing_compat.py b/airflow/typing_compat.py
index 9f1185f76c..163889b8a2 100644
--- a/airflow/typing_compat.py
+++ b/airflow/typing_compat.py
@@ -28,12 +28,3 @@ try:
     from typing import Literal, Protocol, TypedDict, runtime_checkable  # type: ignore
 except ImportError:
     from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable  # type: ignore # noqa
-
-
-# Before Py 3.7, there is no re.Pattern class
-try:
-    from re import Pattern as RePatternType  # type: ignore
-except ImportError:
-    import re
-
-    RePatternType = type(re.compile('', 0))  # type: ignore
diff --git a/airflow/utils/log/secrets_masker.py b/airflow/utils/log/secrets_masker.py
index c4c1c390b2..de038be48b 100644
--- a/airflow/utils/log/secrets_masker.py
+++ b/airflow/utils/log/secrets_masker.py
@@ -24,8 +24,6 @@ from airflow import settings
 from airflow.compat.functools import cache, cached_property
 
 if TYPE_CHECKING:
-    from airflow.typing_compat import RePatternType
-
     RedactableItem = Union[str, Dict[Any, Any], Tuple[Any, ...], List[Any]]
 
 
@@ -115,7 +113,7 @@ def _secrets_masker() -> "SecretsMasker":
 class SecretsMasker(logging.Filter):
     """Redact secrets from logs"""
 
-    replacer: Optional["RePatternType"] = None
+    replacer: Optional[re.Pattern] = None
     patterns: Set[str]
 
     ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
diff --git a/tests/triggers/test_temporal.py b/tests/triggers/test_temporal.py
index 8f2f222d71..1f22694fae 100644
--- a/tests/triggers/test_temporal.py
+++ b/tests/triggers/test_temporal.py
@@ -21,7 +21,6 @@ import datetime
 import pendulum
 import pytest
 
-from airflow.compat.asyncio import create_task
 from airflow.triggers.base import TriggerEvent
 from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
 from airflow.utils import timezone
@@ -72,7 +71,7 @@ async def test_datetime_trigger_timing():
 
     # Create a task that runs the trigger for a short time then cancels it
     trigger = DateTimeTrigger(future_moment)
-    trigger_task = create_task(trigger.run().__anext__())
+    trigger_task = asyncio.create_task(trigger.run().__anext__())
     await asyncio.sleep(0.5)
 
     # It should not have produced a result
@@ -81,7 +80,7 @@ async def test_datetime_trigger_timing():
 
     # Now, make one waiting for en event in the past and do it again
     trigger = DateTimeTrigger(past_moment)
-    trigger_task = create_task(trigger.run().__anext__())
+    trigger_task = asyncio.create_task(trigger.run().__anext__())
     await asyncio.sleep(0.5)
 
     assert trigger_task.done() is True