You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/06/29 14:46:46 UTC
[airflow] branch v1-10-test updated (90446e5 -> eceae90)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.
from 90446e5 Fix logging issue when running tasks (#9363)
new 873f476 [AIRFLOW-6856] Bulk fetch paused_dag_ids
new eceae90 [AIRFLOW-6957] Make retrieving Paused Dag ids a separate method
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
airflow/dag/base_dag.py | 8 --------
airflow/jobs/scheduler_job.py | 9 ++-------
airflow/models/dag.py | 20 ++++++++++++++++++++
airflow/utils/dag_processing.py | 9 ---------
tests/models/test_dag.py | 15 +++++++++++++++
5 files changed, 37 insertions(+), 24 deletions(-)
[airflow] 02/02: [AIRFLOW-6957] Make retrieving Paused Dag ids a
separate method
Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit eceae90d28b8490499640763fa3ec3c0cb01c989
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Mon Jun 29 16:43:45 2020 +0200
[AIRFLOW-6957] Make retrieving Paused Dag ids a separate method
(cherry picked from commit a887e0a1a02e12e00687ff123220de095e560647)
---
airflow/jobs/scheduler_job.py | 2 +-
airflow/models/dag.py | 20 ++++++++++++++++++++
tests/models/test_dag.py | 15 +++++++++++++++
3 files changed, 36 insertions(+), 1 deletion(-)
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 0665779..5b06be8 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1266,7 +1266,7 @@ class SchedulerJob(BaseJob):
:param dagbag: a collection of DAGs to process
:type dagbag: airflow.models.DagBag
:param dags: the DAGs from the DagBag to process
- :type dags: airflow.models.DAG
+ :type dags: list[airflow.models.DAG]
:param tis_out: A list to add generated TaskInstance objects
:type tis_out: list[TaskInstance]
:rtype: None
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 3f46bf6..7759cb3 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1785,6 +1785,26 @@ class DagModel(Base):
return get_last_dagrun(self.dag_id, session=session,
include_externally_triggered=include_externally_triggered)
+ @staticmethod
+ @provide_session
+ def get_paused_dag_ids(dag_ids, session):
+ """
+ Given a list of dag_ids, get a set of Paused Dag Ids
+
+ :param dag_ids: List of Dag ids
+ :param session: ORM Session
+ :return: Paused Dag_ids
+ """
+ paused_dag_ids = (
+ session.query(DagModel.dag_id)
+ .filter(DagModel.is_paused.is_(True))
+ .filter(DagModel.dag_id.in_(dag_ids))
+ .all()
+ )
+
+ paused_dag_ids = set(paused_dag_id for paused_dag_id, in paused_dag_ids)
+ return paused_dag_ids
+
@property
def safe_dag_id(self):
return self.dag_id.replace('.', '__dot__')
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index cdbe1ee..5d9d05d 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -42,6 +42,7 @@ from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils import timezone
from airflow.utils.dag_processing import list_py_file_paths
+from airflow.utils.db import create_session
from airflow.utils.state import State
from airflow.utils.weight_rule import WeightRule
from tests.models import DEFAULT_DATE
@@ -941,3 +942,17 @@ class DagTest(unittest.TestCase):
assert issubclass(PendingDeprecationWarning, warning.category)
self.assertEqual(dag.task_dict, {t1.task_id: t1})
+
+ def test_get_paused_dag_ids(self):
+ dag_id = "test_get_paused_dag_ids"
+ dag = DAG(dag_id, is_paused_upon_creation=True)
+ dag.sync_to_db()
+ self.assertIsNotNone(DagModel.get_dagmodel(dag_id))
+
+ paused_dag_ids = DagModel.get_paused_dag_ids([dag_id])
+ self.assertEqual(paused_dag_ids, {dag_id})
+
+ with create_session() as session:
+ session.query(DagModel).filter(
+ DagModel.dag_id == dag_id).delete(
+ synchronize_session=False)
[airflow] 01/02: [AIRFLOW-6856] Bulk fetch paused_dag_ids
Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 873f47617c37243ff3d33e7b437ba2430e0417a5
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Mon Jun 29 16:40:18 2020 +0200
[AIRFLOW-6856] Bulk fetch paused_dag_ids
(cherry picked from commit d031f844517a8d12e7d90af0c472ca00c64b8963)
---
airflow/dag/base_dag.py | 8 --------
airflow/jobs/scheduler_job.py | 7 +------
airflow/utils/dag_processing.py | 9 ---------
3 files changed, 1 insertion(+), 23 deletions(-)
diff --git a/airflow/dag/base_dag.py b/airflow/dag/base_dag.py
index 0e65775..6e556a3 100644
--- a/airflow/dag/base_dag.py
+++ b/airflow/dag/base_dag.py
@@ -64,14 +64,6 @@ class BaseDag(object):
raise NotImplementedError()
@abstractmethod
- def is_paused(self):
- """
- :return: whether this DAG is paused or not
- :rtype: bool
- """
- raise NotImplementedError()
-
- @abstractmethod
def pickle_id(self):
"""
:return: The pickle ID for this DAG, if it has one. Otherwise None.
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 9da73b7..0665779 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1277,10 +1277,6 @@ class SchedulerJob(BaseJob):
self.log.error("DAG ID %s was not found in the DagBag", dag.dag_id)
continue
- if dag.is_paused:
- self.log.info("Not processing DAG %s since it's paused", dag.dag_id)
- continue
-
self.log.info("Processing %s", dag.dag_id)
dag_run = self.create_dag_run(dag)
@@ -1581,8 +1577,7 @@ class SchedulerJob(BaseJob):
for dag in dagbag.dags.values():
dag.sync_to_db()
- paused_dag_ids = [dag.dag_id for dag in dagbag.dags.values()
- if dag.is_paused]
+ paused_dag_ids = models.DagModel.get_paused_dag_ids(dag_ids=dagbag.dag_ids)
# Pickle the DAGs (if necessary) and put them into a SimpleDag
for dag_id in dagbag.dags:
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index c888726..6e4e045 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -76,7 +76,6 @@ class SimpleDag(BaseDag):
self._dag_id = dag.dag_id
self._task_ids = [task.task_id for task in dag.tasks]
self._full_filepath = dag.full_filepath
- self._is_paused = dag.is_paused
self._concurrency = dag.concurrency
self._pickle_id = pickle_id
self._task_special_args = {}
@@ -120,14 +119,6 @@ class SimpleDag(BaseDag):
return self._concurrency
@property
- def is_paused(self):
- """
- :return: whether this DAG is paused or not
- :rtype: bool
- """
- return self._is_paused
-
- @property
def pickle_id(self):
"""
:return: The pickle ID for this DAG, if it has one. Otherwise None.