You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by as...@apache.org on 2021/03/19 15:06:13 UTC
[airflow] 11/42: Rename last_scheduler_run into last_parsed_time,
and ensure it's updated in DB (#14581)
This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 7156d6cdbc9882999eecfc19f791168dd9bc1d88
Author: Xiaodong DENG <xd...@apache.org>
AuthorDate: Fri Mar 5 23:05:32 2021 +0100
Rename last_scheduler_run into last_parsed_time, and ensure it's updated in DB (#14581)
- Fix functionality
last_scheduler_run was missed in the process of
migrating from sync_to_db/bulk_sync_to_db to bulk_write_to_db.
This issue will fail DAG.deactivate_stale_dags() method,
and blocks users from checking the last schedule time of each DAG in DB
- Change name last_scheduler_run to last_parsed_time,
to better reflect what it does now.
Migration script is added, and codebase is updated
- To ensure the migration scripts can work,
we have to limit the columns needed in create_dag_specific_permissions(),
so migration 2c6edca13270 can work with the renamed column.
Co-authored-by: Kaxil Naik <ka...@gmail.com>
(cherry picked from commit c2a0cb958835d0cecd90f82311e2aa8b1bbd22a0)
---
...e42bb497a22_rename_last_scheduler_run_column.py | 65 ++++++++++++++++++++++
airflow/models/dag.py | 7 ++-
airflow/www/security.py | 9 +--
airflow/www/views.py | 2 +-
tests/models/test_dag.py | 15 +++--
5 files changed, 86 insertions(+), 12 deletions(-)
diff --git a/airflow/migrations/versions/2e42bb497a22_rename_last_scheduler_run_column.py b/airflow/migrations/versions/2e42bb497a22_rename_last_scheduler_run_column.py
new file mode 100644
index 0000000..97d8ff6
--- /dev/null
+++ b/airflow/migrations/versions/2e42bb497a22_rename_last_scheduler_run_column.py
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""rename last_scheduler_run column
+
+Revision ID: 2e42bb497a22
+Revises: 8646922c8a04
+Create Date: 2021-03-04 19:50:38.880942
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mssql
+
+# revision identifiers, used by Alembic.
+revision = '2e42bb497a22'
+down_revision = '8646922c8a04'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ """Apply rename last_scheduler_run column"""
+ conn = op.get_bind()
+ if conn.dialect.name == "mssql":
+ with op.batch_alter_table('dag') as batch_op:
+ batch_op.alter_column(
+ 'last_scheduler_run', new_column_name='last_parsed_time', type_=mssql.DATETIME2(precision=6)
+ )
+ else:
+ with op.batch_alter_table('dag') as batch_op:
+ batch_op.alter_column(
+ 'last_scheduler_run', new_column_name='last_parsed_time', type_=sa.TIMESTAMP(timezone=True)
+ )
+
+
+def downgrade():
+ """Unapply rename last_scheduler_run column"""
+ conn = op.get_bind()
+ if conn.dialect.name == "mssql":
+ with op.batch_alter_table('dag') as batch_op:
+ batch_op.alter_column(
+ 'last_parsed_time', new_column_name='last_scheduler_run', type_=mssql.DATETIME2(precision=6)
+ )
+ else:
+ with op.batch_alter_table('dag') as batch_op:
+ batch_op.alter_column(
+ 'last_parsed_time', new_column_name='last_scheduler_run', type_=sa.TIMESTAMP(timezone=True)
+ )
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index d77cdfc..47fc34b 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1882,6 +1882,7 @@ class DAG(LoggingMixin):
orm_dag.fileloc = dag.fileloc
orm_dag.owners = dag.owner
orm_dag.is_active = True
+ orm_dag.last_parsed_time = timezone.utcnow()
orm_dag.default_view = dag.default_view
orm_dag.description = dag.description
orm_dag.schedule_interval = dag.schedule_interval
@@ -1966,13 +1967,13 @@ class DAG(LoggingMixin):
"""
for dag in (
session.query(DagModel)
- .filter(DagModel.last_scheduler_run < expiration_date, DagModel.is_active)
+ .filter(DagModel.last_parsed_time < expiration_date, DagModel.is_active)
.all()
):
log.info(
"Deactivating DAG ID %s since it was last touched by the scheduler at %s",
dag.dag_id,
- dag.last_scheduler_run.isoformat(),
+ dag.last_parsed_time.isoformat(),
)
dag.is_active = False
session.merge(dag)
@@ -2075,7 +2076,7 @@ class DagModel(Base):
# Whether that DAG was seen on the last DagBag load
is_active = Column(Boolean, default=False)
# Last time the scheduler started
- last_scheduler_run = Column(UtcDateTime)
+ last_parsed_time = Column(UtcDateTime)
# Last time this DAG was pickled
last_pickled = Column(UtcDateTime)
# Time when the DAG last received a refresh signal
diff --git a/airflow/www/security.py b/airflow/www/security.py
index 09af167..5201ef6 100644
--- a/airflow/www/security.py
+++ b/airflow/www/security.py
@@ -474,15 +474,16 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin): # pylint: disable=
:return: None.
"""
perms = self.get_all_permissions()
- dag_models = (
- session.query(models.DagModel)
+ rows = (
+ session.query(models.DagModel.dag_id)
.filter(or_(models.DagModel.is_active, models.DagModel.is_paused))
.all()
)
- for dag in dag_models:
+ for row in rows:
+ dag_id = row[0]
for perm_name in self.DAG_PERMS:
- dag_resource_name = self.prefixed_dag_id(dag.dag_id)
+ dag_resource_name = self.prefixed_dag_id(dag_id)
if dag_resource_name and perm_name and (dag_resource_name, perm_name) not in perms:
self._merge_perm(perm_name, dag_resource_name)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index b7c5372..c1155ee 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3729,7 +3729,7 @@ class DagModelView(AirflowModelView):
list_columns = [
'dag_id',
'is_paused',
- 'last_scheduler_run',
+ 'last_parsed_time',
'last_expired',
'scheduler_lock',
'fileloc',
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index c923241..0aae371 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -646,15 +646,19 @@ class TestDag(unittest.TestCase):
('dag-bulk-sync-2', 'test-dag'),
('dag-bulk-sync-3', 'test-dag'),
} == set(session.query(DagTag.dag_id, DagTag.name).all())
+
+ for row in session.query(DagModel.last_parsed_time).all():
+ assert row[0] is not None
+
# Re-sync should do fewer queries
- with assert_queries_count(3):
+ with assert_queries_count(4):
DAG.bulk_write_to_db(dags)
- with assert_queries_count(3):
+ with assert_queries_count(4):
DAG.bulk_write_to_db(dags)
# Adding tags
for dag in dags:
dag.tags.append("test-dag2")
- with assert_queries_count(4):
+ with assert_queries_count(5):
DAG.bulk_write_to_db(dags)
with create_session() as session:
assert {'dag-bulk-sync-0', 'dag-bulk-sync-1', 'dag-bulk-sync-2', 'dag-bulk-sync-3'} == {
@@ -673,7 +677,7 @@ class TestDag(unittest.TestCase):
# Removing tags
for dag in dags:
dag.tags.remove("test-dag")
- with assert_queries_count(4):
+ with assert_queries_count(5):
DAG.bulk_write_to_db(dags)
with create_session() as session:
assert {'dag-bulk-sync-0', 'dag-bulk-sync-1', 'dag-bulk-sync-2', 'dag-bulk-sync-3'} == {
@@ -686,6 +690,9 @@ class TestDag(unittest.TestCase):
('dag-bulk-sync-3', 'test-dag2'),
} == set(session.query(DagTag.dag_id, DagTag.name).all())
+ for row in session.query(DagModel.last_parsed_time).all():
+ assert row[0] is not None
+
def test_bulk_write_to_db_max_active_runs(self):
"""
Test that DagModel.next_dagrun_create_after is set to NULL when the dag cannot be created due to max