You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2020/06/29 14:46:46 UTC

[airflow] branch v1-10-test updated (90446e5 -> eceae90)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


    from 90446e5  Fix logging issue when running tasks (#9363)
     new 873f476  [AIRFLOW-6856] Bulk fetch paused_dag_ids
     new eceae90  [AIRFLOW-6957] Make retrieving Paused Dag ids a separate method

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 airflow/dag/base_dag.py         |  8 --------
 airflow/jobs/scheduler_job.py   |  9 ++-------
 airflow/models/dag.py           | 20 ++++++++++++++++++++
 airflow/utils/dag_processing.py |  9 ---------
 tests/models/test_dag.py        | 15 +++++++++++++++
 5 files changed, 37 insertions(+), 24 deletions(-)


[airflow] 02/02: [AIRFLOW-6957] Make retrieving Paused Dag ids a separate method

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit eceae90d28b8490499640763fa3ec3c0cb01c989
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Mon Jun 29 16:43:45 2020 +0200

    [AIRFLOW-6957] Make retrieving Paused Dag ids a separate method
    
       (cherry picked from commit a887e0a1a02e12e00687ff123220de095e560647)
---
 airflow/jobs/scheduler_job.py |  2 +-
 airflow/models/dag.py         | 20 ++++++++++++++++++++
 tests/models/test_dag.py      | 15 +++++++++++++++
 3 files changed, 36 insertions(+), 1 deletion(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 0665779..5b06be8 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1266,7 +1266,7 @@ class SchedulerJob(BaseJob):
         :param dagbag: a collection of DAGs to process
         :type dagbag: airflow.models.DagBag
         :param dags: the DAGs from the DagBag to process
-        :type dags: airflow.models.DAG
+        :type dags: list[airflow.models.DAG]
         :param tis_out: A list to add generated TaskInstance objects
         :type tis_out: list[TaskInstance]
         :rtype: None
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 3f46bf6..7759cb3 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1785,6 +1785,26 @@ class DagModel(Base):
         return get_last_dagrun(self.dag_id, session=session,
                                include_externally_triggered=include_externally_triggered)
 
+    @staticmethod
+    @provide_session
+    def get_paused_dag_ids(dag_ids, session):
+        """
+        Given a list of dag_ids, get a set of Paused Dag Ids
+
+        :param dag_ids: List of Dag ids
+        :param session: ORM Session
+        :return: Paused Dag_ids
+        """
+        paused_dag_ids = (
+            session.query(DagModel.dag_id)
+            .filter(DagModel.is_paused.is_(True))
+            .filter(DagModel.dag_id.in_(dag_ids))
+            .all()
+        )
+
+        paused_dag_ids = set(paused_dag_id for paused_dag_id, in paused_dag_ids)
+        return paused_dag_ids
+
     @property
     def safe_dag_id(self):
         return self.dag_id.replace('.', '__dot__')
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index cdbe1ee..5d9d05d 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -42,6 +42,7 @@ from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.subdag_operator import SubDagOperator
 from airflow.utils import timezone
 from airflow.utils.dag_processing import list_py_file_paths
+from airflow.utils.db import create_session
 from airflow.utils.state import State
 from airflow.utils.weight_rule import WeightRule
 from tests.models import DEFAULT_DATE
@@ -941,3 +942,17 @@ class DagTest(unittest.TestCase):
             assert issubclass(PendingDeprecationWarning, warning.category)
 
             self.assertEqual(dag.task_dict, {t1.task_id: t1})
+
+    def test_get_paused_dag_ids(self):
+        dag_id = "test_get_paused_dag_ids"
+        dag = DAG(dag_id, is_paused_upon_creation=True)
+        dag.sync_to_db()
+        self.assertIsNotNone(DagModel.get_dagmodel(dag_id))
+
+        paused_dag_ids = DagModel.get_paused_dag_ids([dag_id])
+        self.assertEqual(paused_dag_ids, {dag_id})
+
+        with create_session() as session:
+            session.query(DagModel).filter(
+                DagModel.dag_id == dag_id).delete(
+                synchronize_session=False)


[airflow] 01/02: [AIRFLOW-6856] Bulk fetch paused_dag_ids

Posted by po...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 873f47617c37243ff3d33e7b437ba2430e0417a5
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Mon Jun 29 16:40:18 2020 +0200

    [AIRFLOW-6856] Bulk fetch paused_dag_ids
    
      (cherry picked from commit d031f844517a8d12e7d90af0c472ca00c64b8963)
---
 airflow/dag/base_dag.py         | 8 --------
 airflow/jobs/scheduler_job.py   | 7 +------
 airflow/utils/dag_processing.py | 9 ---------
 3 files changed, 1 insertion(+), 23 deletions(-)

diff --git a/airflow/dag/base_dag.py b/airflow/dag/base_dag.py
index 0e65775..6e556a3 100644
--- a/airflow/dag/base_dag.py
+++ b/airflow/dag/base_dag.py
@@ -64,14 +64,6 @@ class BaseDag(object):
         raise NotImplementedError()
 
     @abstractmethod
-    def is_paused(self):
-        """
-        :return: whether this DAG is paused or not
-        :rtype: bool
-        """
-        raise NotImplementedError()
-
-    @abstractmethod
     def pickle_id(self):
         """
         :return: The pickle ID for this DAG, if it has one. Otherwise None.
diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 9da73b7..0665779 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1277,10 +1277,6 @@ class SchedulerJob(BaseJob):
                 self.log.error("DAG ID %s was not found in the DagBag", dag.dag_id)
                 continue
 
-            if dag.is_paused:
-                self.log.info("Not processing DAG %s since it's paused", dag.dag_id)
-                continue
-
             self.log.info("Processing %s", dag.dag_id)
 
             dag_run = self.create_dag_run(dag)
@@ -1581,8 +1577,7 @@ class SchedulerJob(BaseJob):
         for dag in dagbag.dags.values():
             dag.sync_to_db()
 
-        paused_dag_ids = [dag.dag_id for dag in dagbag.dags.values()
-                          if dag.is_paused]
+        paused_dag_ids = models.DagModel.get_paused_dag_ids(dag_ids=dagbag.dag_ids)
 
         # Pickle the DAGs (if necessary) and put them into a SimpleDag
         for dag_id in dagbag.dags:
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
index c888726..6e4e045 100644
--- a/airflow/utils/dag_processing.py
+++ b/airflow/utils/dag_processing.py
@@ -76,7 +76,6 @@ class SimpleDag(BaseDag):
         self._dag_id = dag.dag_id
         self._task_ids = [task.task_id for task in dag.tasks]
         self._full_filepath = dag.full_filepath
-        self._is_paused = dag.is_paused
         self._concurrency = dag.concurrency
         self._pickle_id = pickle_id
         self._task_special_args = {}
@@ -120,14 +119,6 @@ class SimpleDag(BaseDag):
         return self._concurrency
 
     @property
-    def is_paused(self):
-        """
-        :return: whether this DAG is paused or not
-        :rtype: bool
-        """
-        return self._is_paused
-
-    @property
     def pickle_id(self):
         """
         :return: The pickle ID for this DAG, if it has one. Otherwise None.