You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2022/07/05 14:47:24 UTC
[airflow] 05/16: Drop Python 3.6 compatibility objects/modules (#24048)
This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 46d26c76af43f0e5acfe299176c906414f69bcbd
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Wed Jun 1 08:15:56 2022 +0300
Drop Python 3.6 compatibility objects/modules (#24048)
(cherry picked from commit 1dccaad46b901189c1928cef8419f1ea1160d550)
---
airflow/compat/asyncio.py | 28 ----------------------------
airflow/jobs/triggerer_job.py | 5 ++---
airflow/models/dag.py | 6 +++---
airflow/typing_compat.py | 9 ---------
airflow/utils/log/secrets_masker.py | 4 +---
tests/triggers/test_temporal.py | 5 ++---
6 files changed, 8 insertions(+), 49 deletions(-)
diff --git a/airflow/compat/asyncio.py b/airflow/compat/asyncio.py
deleted file mode 100644
index a999ce604d..0000000000
--- a/airflow/compat/asyncio.py
+++ /dev/null
@@ -1,28 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-try:
- from asyncio import create_task
-except ImportError:
- # create_task is not present in Python 3.6. Once Airflow is at 3.7+, we can
- # remove this helper.
- def create_task(*args, **kwargs): # type: ignore
- """A version of create_task that always errors."""
- raise RuntimeError("Airflow's async functionality is only available on Python 3.7+")
-
-
-__all__ = ["create_task"]
diff --git a/airflow/jobs/triggerer_job.py b/airflow/jobs/triggerer_job.py
index 2c0a0bd5bf..ac7d22a6b1 100644
--- a/airflow/jobs/triggerer_job.py
+++ b/airflow/jobs/triggerer_job.py
@@ -26,7 +26,6 @@ from typing import Deque, Dict, Set, Tuple, Type
from sqlalchemy import func
-from airflow.compat.asyncio import create_task
from airflow.configuration import conf
from airflow.jobs.base_job import BaseJob
from airflow.models.trigger import Trigger
@@ -236,7 +235,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
The loop in here runs trigger addition/deletion/cleanup. Actual
triggers run in their own separate coroutines.
"""
- watchdog = create_task(self.block_watchdog())
+ watchdog = asyncio.create_task(self.block_watchdog())
last_status = time.time()
while not self.stop:
# Run core logic
@@ -263,7 +262,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
trigger_id, trigger_instance = self.to_create.popleft()
if trigger_id not in self.triggers:
self.triggers[trigger_id] = {
- "task": create_task(self.run_trigger(trigger_id, trigger_instance)),
+ "task": asyncio.create_task(self.run_trigger(trigger_id, trigger_instance)),
"name": f"{trigger_instance!r} (ID {trigger_id})",
"events": 0,
}
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 823287dcb1..4370f36c3a 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -78,7 +78,7 @@ from airflow.stats import Stats
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable
from airflow.timetables.simple import NullTimetable, OnceTimetable
-from airflow.typing_compat import Literal, RePatternType
+from airflow.typing_compat import Literal
from airflow.utils import timezone
from airflow.utils.dag_cycle_tester import check_cycle
from airflow.utils.dates import cron_presets, date_range as utils_date_range
@@ -1998,7 +1998,7 @@ class DAG(LoggingMixin):
def partial_subset(
self,
- task_ids_or_regex: Union[str, RePatternType, Iterable[str]],
+ task_ids_or_regex: Union[str, re.Pattern, Iterable[str]],
include_downstream=False,
include_upstream=True,
include_direct_upstream=False,
@@ -2026,7 +2026,7 @@ class DAG(LoggingMixin):
memo = {id(self.task_dict): None, id(self._task_group): None}
dag = copy.deepcopy(self, memo) # type: ignore
- if isinstance(task_ids_or_regex, (str, RePatternType)):
+ if isinstance(task_ids_or_regex, (str, re.Pattern)):
matched_tasks = [t for t in self.tasks if re.findall(task_ids_or_regex, t.task_id)]
else:
matched_tasks = [t for t in self.tasks if t.task_id in task_ids_or_regex]
diff --git a/airflow/typing_compat.py b/airflow/typing_compat.py
index 9f1185f76c..163889b8a2 100644
--- a/airflow/typing_compat.py
+++ b/airflow/typing_compat.py
@@ -28,12 +28,3 @@ try:
from typing import Literal, Protocol, TypedDict, runtime_checkable # type: ignore
except ImportError:
from typing_extensions import Literal, Protocol, TypedDict, runtime_checkable # type: ignore # noqa
-
-
-# Before Py 3.7, there is no re.Pattern class
-try:
- from re import Pattern as RePatternType # type: ignore
-except ImportError:
- import re
-
- RePatternType = type(re.compile('', 0)) # type: ignore
diff --git a/airflow/utils/log/secrets_masker.py b/airflow/utils/log/secrets_masker.py
index c4c1c390b2..de038be48b 100644
--- a/airflow/utils/log/secrets_masker.py
+++ b/airflow/utils/log/secrets_masker.py
@@ -24,8 +24,6 @@ from airflow import settings
from airflow.compat.functools import cache, cached_property
if TYPE_CHECKING:
- from airflow.typing_compat import RePatternType
-
RedactableItem = Union[str, Dict[Any, Any], Tuple[Any, ...], List[Any]]
@@ -115,7 +113,7 @@ def _secrets_masker() -> "SecretsMasker":
class SecretsMasker(logging.Filter):
"""Redact secrets from logs"""
- replacer: Optional["RePatternType"] = None
+ replacer: Optional[re.Pattern] = None
patterns: Set[str]
ALREADY_FILTERED_FLAG = "__SecretsMasker_filtered"
diff --git a/tests/triggers/test_temporal.py b/tests/triggers/test_temporal.py
index 8f2f222d71..1f22694fae 100644
--- a/tests/triggers/test_temporal.py
+++ b/tests/triggers/test_temporal.py
@@ -21,7 +21,6 @@ import datetime
import pendulum
import pytest
-from airflow.compat.asyncio import create_task
from airflow.triggers.base import TriggerEvent
from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.utils import timezone
@@ -72,7 +71,7 @@ async def test_datetime_trigger_timing():
# Create a task that runs the trigger for a short time then cancels it
trigger = DateTimeTrigger(future_moment)
- trigger_task = create_task(trigger.run().__anext__())
+ trigger_task = asyncio.create_task(trigger.run().__anext__())
await asyncio.sleep(0.5)
# It should not have produced a result
@@ -81,7 +80,7 @@ async def test_datetime_trigger_timing():
# Now, make one waiting for en event in the past and do it again
trigger = DateTimeTrigger(past_moment)
- trigger_task = create_task(trigger.run().__anext__())
+ trigger_task = asyncio.create_task(trigger.run().__anext__())
await asyncio.sleep(0.5)
assert trigger_task.done() is True