You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/03/19 15:06:13 UTC

[airflow] 11/42: Rename last_scheduler_run into last_parsed_time, and ensure it's updated in DB (#14581)

This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7156d6cdbc9882999eecfc19f791168dd9bc1d88
Author: Xiaodong DENG <xd...@apache.org>
AuthorDate: Fri Mar 5 23:05:32 2021 +0100

    Rename last_scheduler_run into last_parsed_time, and ensure it's updated in DB (#14581)
    
    - Fix functionality
      last_scheduler_run was missed in the process of
      migrating from sync_to_db/bulk_sync_to_db to bulk_write_to_db.
    
      This issue will fail DAG.deactivate_stale_dags() method,
      and blocks users from checking the last schedule time of each DAG in DB
    
    - Change name last_scheduler_run to last_parsed_time,
      to better reflect what it does now.
      Migration script is added, and codebase is updated
    
    - To ensure the migration scripts can work,
      we have to limit the columns needed in create_dag_specific_permissions(),
      so migration 2c6edca13270 can work with the renamed column.
    
    Co-authored-by: Kaxil Naik <ka...@gmail.com>
    (cherry picked from commit c2a0cb958835d0cecd90f82311e2aa8b1bbd22a0)
---
 ...e42bb497a22_rename_last_scheduler_run_column.py | 65 ++++++++++++++++++++++
 airflow/models/dag.py                              |  7 ++-
 airflow/www/security.py                            |  9 +--
 airflow/www/views.py                               |  2 +-
 tests/models/test_dag.py                           | 15 +++--
 5 files changed, 86 insertions(+), 12 deletions(-)

diff --git a/airflow/migrations/versions/2e42bb497a22_rename_last_scheduler_run_column.py b/airflow/migrations/versions/2e42bb497a22_rename_last_scheduler_run_column.py
new file mode 100644
index 0000000..97d8ff6
--- /dev/null
+++ b/airflow/migrations/versions/2e42bb497a22_rename_last_scheduler_run_column.py
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""rename last_scheduler_run column
+
+Revision ID: 2e42bb497a22
+Revises: 8646922c8a04
+Create Date: 2021-03-04 19:50:38.880942
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mssql
+
+# revision identifiers, used by Alembic.
+revision = '2e42bb497a22'
+down_revision = '8646922c8a04'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+    """Apply rename last_scheduler_run column"""
+    conn = op.get_bind()
+    if conn.dialect.name == "mssql":
+        with op.batch_alter_table('dag') as batch_op:
+            batch_op.alter_column(
+                'last_scheduler_run', new_column_name='last_parsed_time', type_=mssql.DATETIME2(precision=6)
+            )
+    else:
+        with op.batch_alter_table('dag') as batch_op:
+            batch_op.alter_column(
+                'last_scheduler_run', new_column_name='last_parsed_time', type_=sa.TIMESTAMP(timezone=True)
+            )
+
+
+def downgrade():
+    """Unapply rename last_scheduler_run column"""
+    conn = op.get_bind()
+    if conn.dialect.name == "mssql":
+        with op.batch_alter_table('dag') as batch_op:
+            batch_op.alter_column(
+                'last_parsed_time', new_column_name='last_scheduler_run', type_=mssql.DATETIME2(precision=6)
+            )
+    else:
+        with op.batch_alter_table('dag') as batch_op:
+            batch_op.alter_column(
+                'last_parsed_time', new_column_name='last_scheduler_run', type_=sa.TIMESTAMP(timezone=True)
+            )
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index d77cdfc..47fc34b 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1882,6 +1882,7 @@ class DAG(LoggingMixin):
                 orm_dag.fileloc = dag.fileloc
                 orm_dag.owners = dag.owner
             orm_dag.is_active = True
+            orm_dag.last_parsed_time = timezone.utcnow()
             orm_dag.default_view = dag.default_view
             orm_dag.description = dag.description
             orm_dag.schedule_interval = dag.schedule_interval
@@ -1966,13 +1967,13 @@ class DAG(LoggingMixin):
         """
         for dag in (
             session.query(DagModel)
-            .filter(DagModel.last_scheduler_run < expiration_date, DagModel.is_active)
+            .filter(DagModel.last_parsed_time < expiration_date, DagModel.is_active)
             .all()
         ):
             log.info(
                 "Deactivating DAG ID %s since it was last touched by the scheduler at %s",
                 dag.dag_id,
-                dag.last_scheduler_run.isoformat(),
+                dag.last_parsed_time.isoformat(),
             )
             dag.is_active = False
             session.merge(dag)
@@ -2075,7 +2076,7 @@ class DagModel(Base):
     # Whether that DAG was seen on the last DagBag load
     is_active = Column(Boolean, default=False)
     # Last time the scheduler started
-    last_scheduler_run = Column(UtcDateTime)
+    last_parsed_time = Column(UtcDateTime)
     # Last time this DAG was pickled
     last_pickled = Column(UtcDateTime)
     # Time when the DAG last received a refresh signal
diff --git a/airflow/www/security.py b/airflow/www/security.py
index 09af167..5201ef6 100644
--- a/airflow/www/security.py
+++ b/airflow/www/security.py
@@ -474,15 +474,16 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):  # pylint: disable=
         :return: None.
         """
         perms = self.get_all_permissions()
-        dag_models = (
-            session.query(models.DagModel)
+        rows = (
+            session.query(models.DagModel.dag_id)
             .filter(or_(models.DagModel.is_active, models.DagModel.is_paused))
             .all()
         )
 
-        for dag in dag_models:
+        for row in rows:
+            dag_id = row[0]
             for perm_name in self.DAG_PERMS:
-                dag_resource_name = self.prefixed_dag_id(dag.dag_id)
+                dag_resource_name = self.prefixed_dag_id(dag_id)
                 if dag_resource_name and perm_name and (dag_resource_name, perm_name) not in perms:
                     self._merge_perm(perm_name, dag_resource_name)
 
diff --git a/airflow/www/views.py b/airflow/www/views.py
index b7c5372..c1155ee 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3729,7 +3729,7 @@ class DagModelView(AirflowModelView):
     list_columns = [
         'dag_id',
         'is_paused',
-        'last_scheduler_run',
+        'last_parsed_time',
         'last_expired',
         'scheduler_lock',
         'fileloc',
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index c923241..0aae371 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -646,15 +646,19 @@ class TestDag(unittest.TestCase):
                 ('dag-bulk-sync-2', 'test-dag'),
                 ('dag-bulk-sync-3', 'test-dag'),
             } == set(session.query(DagTag.dag_id, DagTag.name).all())
+
+            for row in session.query(DagModel.last_parsed_time).all():
+                assert row[0] is not None
+
         # Re-sync should do fewer queries
-        with assert_queries_count(3):
+        with assert_queries_count(4):
             DAG.bulk_write_to_db(dags)
-        with assert_queries_count(3):
+        with assert_queries_count(4):
             DAG.bulk_write_to_db(dags)
         # Adding tags
         for dag in dags:
             dag.tags.append("test-dag2")
-        with assert_queries_count(4):
+        with assert_queries_count(5):
             DAG.bulk_write_to_db(dags)
         with create_session() as session:
             assert {'dag-bulk-sync-0', 'dag-bulk-sync-1', 'dag-bulk-sync-2', 'dag-bulk-sync-3'} == {
@@ -673,7 +677,7 @@ class TestDag(unittest.TestCase):
         # Removing tags
         for dag in dags:
             dag.tags.remove("test-dag")
-        with assert_queries_count(4):
+        with assert_queries_count(5):
             DAG.bulk_write_to_db(dags)
         with create_session() as session:
             assert {'dag-bulk-sync-0', 'dag-bulk-sync-1', 'dag-bulk-sync-2', 'dag-bulk-sync-3'} == {
@@ -686,6 +690,9 @@ class TestDag(unittest.TestCase):
                 ('dag-bulk-sync-3', 'test-dag2'),
             } == set(session.query(DagTag.dag_id, DagTag.name).all())
 
+            for row in session.query(DagModel.last_parsed_time).all():
+                assert row[0] is not None
+
     def test_bulk_write_to_db_max_active_runs(self):
         """
         Test that DagModel.next_dagrun_create_after is set to NULL when the dag cannot be created due to max